use std::sync::Arc; use std::time::Duration; use k8s_openapi::api::apps::v1::Deployment; use k8s_openapi::api::core::v1::{PersistentVolumeClaim, Service}; use kube::api::{Api, Patch, PatchParams}; use kube::runtime::controller::Action; use crate::resources::deployment::build_minio_deployment; use crate::resources::pvc::build_minio_pvc; use crate::resources::service::build_minio_service; use crate::types::minioinstance::{MinioInstance, MinioInstanceStatus}; use crate::types::webservice::Condition; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Kube error: {0}")] Kube(#[source] kube::Error), #[error("Missing field: {0}")] MissingField(String), } pub struct Context { pub client: kube::Client, } static FIELD_MANAGER: &str = "platform-operator"; pub async fn reconcile( mi: Arc, ctx: Arc, ) -> Result { let result = reconcile_apply(&mi, &ctx).await; if let Err(ref e) = result { let _ = update_status(&ctx.client, &mi, false, &e.to_string()).await; } result } async fn reconcile_apply(mi: &MinioInstance, ctx: &Context) -> Result { let client = &ctx.client; let ns = mi .metadata .namespace .as_deref() .ok_or_else(|| Error::MissingField("metadata.namespace".to_string()))?; let name = mi .metadata .name .as_deref() .ok_or_else(|| Error::MissingField("metadata.name".to_string()))?; tracing::info!(minioinstance = name, namespace = ns, "reconciling"); let pvc_api: Api = Api::namespaced(client.clone(), ns); let pvc_name = format!("{name}-data"); if pvc_api.get_opt(&pvc_name).await.map_err(Error::Kube)?.is_none() { let pvc = build_minio_pvc(mi); pvc_api .create(&Default::default(), &pvc) .await .map_err(Error::Kube)?; tracing::info!(minioinstance = name, "created pvc"); } let dep = build_minio_deployment(mi); let dep_api: Api = Api::namespaced(client.clone(), ns); dep_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&dep), ) .await .map_err(Error::Kube)?; tracing::info!(minioinstance = name, "applied deployment"); let svc = build_minio_service(mi); let svc_api: Api = Api::namespaced(client.clone(), ns); svc_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&svc), ) .await .map_err(Error::Kube)?; tracing::info!(minioinstance = name, "applied service"); update_status(client, mi, true, "all resources applied").await?; tracing::info!(minioinstance = name, "reconciled"); Ok(Action::requeue(Duration::from_secs(300))) } pub fn error_policy( _mi: Arc, error: &Error, _ctx: Arc, ) -> Action { tracing::warn!(%error, "reconcile failed"); Action::requeue(Duration::from_secs(60)) } async fn update_status( client: &kube::Client, mi: &MinioInstance, ready: bool, message: &str, ) -> Result<(), Error> { let ns = mi.metadata.namespace.as_deref().unwrap_or("default"); let api = Api::::namespaced(client.clone(), ns); let name = mi.metadata.name.as_deref().unwrap(); let now = chrono::Utc::now().to_rfc3339(); let status = MinioInstanceStatus { conditions: vec![Condition { type_: "Ready".to_string(), status: if ready { "True" } else { "False" }.to_string(), reason: if ready { "Reconciled" } else { "Error" }.to_string(), message: message.to_string(), last_transition_time: now, }], observed_generation: mi.metadata.generation, }; let patch = serde_json::json!({ "apiVersion": "irc.now/v1alpha1", "kind": "MinioInstance", "status": status, }); api.patch_status(name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&patch)) .await .map_err(Error::Kube)?; Ok(()) }