use std::sync::Arc; use std::time::Duration; use k8s_openapi::api::apps::v1::Deployment; use k8s_openapi::api::core::v1::{ConfigMap, PersistentVolumeClaim, Secret, Service}; use kube::api::{Api, DynamicObject, Patch, PatchParams}; use kube::core::{ApiResource, GroupVersionKind}; use kube::runtime::controller::Action; use kube::runtime::finalizer::{finalizer, Event}; use crate::resources::certificate::build_certificate; use crate::resources::configmap::build_configmap; use crate::resources::deployment::build_deployment; use crate::resources::pvc::build_pvc; use crate::resources::route::build_route; use crate::resources::service::build_service; use crate::types::{Condition, ErgoNetwork, ErgoNetworkStatus}; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Kube error: {0}")] Kube(#[source] kube::Error), #[error("Missing field: {0}")] MissingField(String), #[error("Serialization error: {0}")] Serialization(#[source] serde_json::Error), #[error("Finalizer error: {0}")] Finalizer(#[source] Box>), } pub struct Context { pub client: kube::Client, } static FIELD_MANAGER: &str = "ergo-operator"; static FINALIZER_NAME: &str = "irc.now/ergo-cleanup"; pub async fn reconcile( network: Arc, ctx: Arc, ) -> Result { let client = &ctx.client; let ns = network .metadata .namespace .as_deref() .ok_or_else(|| Error::MissingField("metadata.namespace".to_string()))?; let api: Api = Api::namespaced(client.clone(), ns); finalizer(&api, FINALIZER_NAME, network, |event| async { match event { Event::Apply(network) => { let result = reconcile_apply(&network, &ctx).await; if let Err(ref e) = result { let _ = update_status(&ctx.client, &network, false, &e.to_string()).await; } result } Event::Cleanup(network) => reconcile_cleanup(&network, &ctx).await, } }) .await .map_err(|e| Error::Finalizer(Box::new(e))) } async fn reconcile_cleanup(network: &ErgoNetwork, ctx: &Context) -> Result { let client = &ctx.client; let ns = network.metadata.namespace.as_deref().unwrap_or("default"); let name = network.metadata.name.as_deref().unwrap_or("unknown"); let pvc_name = format!("{name}-data"); let pvc_api: Api = Api::namespaced(client.clone(), ns); if let Err(e) = pvc_api.delete(&pvc_name, &Default::default()).await { tracing::warn!(network = name, "failed to delete pvc: {e}"); } else { tracing::info!(network = name, "deleted pvc {pvc_name}"); } Ok(Action::await_change()) } async fn reconcile_apply(network: &ErgoNetwork, ctx: &Context) -> Result { let client = &ctx.client; let ns = network .metadata .namespace .as_deref() .ok_or_else(|| Error::MissingField("metadata.namespace".to_string()))?; let name = network .metadata .name .as_deref() .ok_or_else(|| Error::MissingField("metadata.name".to_string()))?; tracing::info!(network = name, namespace = ns, "reconciling"); let oper_hash = read_oper_hash(client, ns, network).await?; let oauth2_secret = read_oauth2_secret(client, ns, network).await?; let pvc_api: Api = Api::namespaced(client.clone(), ns); let pvc_name = format!("{name}-data"); if pvc_api.get_opt(&pvc_name).await.map_err(Error::Kube)?.is_none() { let pvc = build_pvc(network); pvc_api .create(&Default::default(), &pvc) .await .map_err(Error::Kube)?; tracing::info!(network = name, "created pvc"); } if network.spec.tls.is_some() { let cert = build_certificate(network); let cert_api: Api = Api::namespaced(client.clone(), ns); let cert_name = cert.metadata.name.as_deref().unwrap(); cert_api .patch( cert_name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&cert), ) .await .map_err(Error::Kube)?; tracing::info!(network = name, "applied certificate"); } let cm = build_configmap(network, oper_hash.as_deref(), oauth2_secret.as_deref()); let cm_api: Api = Api::namespaced(client.clone(), ns); let cm_name = cm.metadata.name.as_deref().unwrap(); cm_api .patch( cm_name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&cm), ) .await .map_err(Error::Kube)?; tracing::info!(network = name, "applied configmap"); let config_data = cm.data.as_ref().and_then(|d| d.get("ircd.yaml")).cloned().unwrap_or_default(); let config_hash = { use sha2::{Digest, Sha256}; let hash = Sha256::digest(config_data.as_bytes()); format!("{:x}", hash) }; let dep = build_deployment(network, &config_hash); let dep_api: Api = Api::namespaced(client.clone(), ns); dep_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&dep), ) .await .map_err(Error::Kube)?; tracing::info!(network = name, "applied deployment"); let svc = build_service(network); let svc_api: Api = Api::namespaced(client.clone(), ns); svc_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&svc), ) .await .map_err(Error::Kube)?; tracing::info!(network = name, "applied service"); if network.spec.route.is_some() { let route = build_route(network); let gvk = GroupVersionKind::gvk("route.openshift.io", "v1", "Route"); let ar = ApiResource::from_gvk_with_plural(&gvk, "routes"); let route_api: Api = Api::namespaced_with(client.clone(), ns, &ar); let patch_value = serde_json::json!({ "apiVersion": "route.openshift.io/v1", "kind": "Route", "metadata": route.metadata, "spec": route.spec, }); route_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&patch_value), ) .await .map_err(Error::Kube)?; tracing::info!(network = name, "applied route"); } update_status(client, network, true, "all resources applied").await?; tracing::info!(network = name, "reconciled"); Ok(Action::requeue(Duration::from_secs(300))) } async fn read_oper_hash( client: &kube::Client, ns: &str, network: &ErgoNetwork, ) -> Result, Error> { let oper_creds = match &network.spec.oper_credentials { Some(c) => c, None => return Ok(None), }; let secret_api: Api = Api::namespaced(client.clone(), ns); let secret = secret_api .get(&oper_creds.secret_name) .await .map_err(Error::Kube)?; if let Some(data) = &secret.data { if let Some(hash_bytes) = data.get("password-hash") { return Ok(Some(String::from_utf8_lossy(&hash_bytes.0).to_string())); } } if let Some(string_data) = &secret.string_data { if let Some(hash) = string_data.get("password-hash") { return Ok(Some(hash.clone())); } } Ok(None) } async fn read_oauth2_secret( client: &kube::Client, ns: &str, network: &ErgoNetwork, ) -> Result, Error> { let oauth2 = match &network.spec.oauth2 { Some(o) => o, None => return Ok(None), }; let secret_api: Api = Api::namespaced(client.clone(), ns); let secret = secret_api .get(&oauth2.secret_ref) .await .map_err(Error::Kube)?; if let Some(data) = &secret.data { if let Some(bytes) = data.get("client-secret") { return Ok(Some(String::from_utf8_lossy(&bytes.0).to_string())); } } Ok(None) } pub fn error_policy( _network: Arc, error: &Error, _ctx: Arc, ) -> Action { tracing::warn!(%error, "reconcile failed"); Action::requeue(Duration::from_secs(60)) } async fn update_status( client: &kube::Client, network: &ErgoNetwork, ready: bool, message: &str, ) -> Result<(), Error> { let ns = network.metadata.namespace.as_deref().unwrap_or("default"); let api = Api::::namespaced(client.clone(), ns); let name = network.metadata.name.as_deref().unwrap(); let now = chrono::Utc::now().to_rfc3339(); let status = ErgoNetworkStatus { conditions: vec![Condition { type_: "Ready".to_string(), status: if ready { "True" } else { "False" }.to_string(), reason: if ready { "Reconciled" } else { "Error" }.to_string(), message: message.to_string(), last_transition_time: now, }], observed_generation: network.metadata.generation, }; let patch = serde_json::json!({ "apiVersion": "irc.now/v1alpha1", "kind": "ErgoNetwork", "status": status, }); api.patch_status(name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&patch)) .await .map_err(Error::Kube)?; Ok(()) }