use std::sync::Arc; use std::time::Duration; use k8s_openapi::api::apps::v1::Deployment; use k8s_openapi::api::core::v1::{ConfigMap, Secret, Service}; use k8s_crds_cert_manager::certificates::Certificate; use kube::api::{Api, DynamicObject, Patch, PatchParams}; use kube::core::{ApiResource, GroupVersionKind}; use kube::runtime::controller::Action; use kube::runtime::finalizer::{finalizer, Event}; use crate::db; use crate::resources::certificate::build_certificate; use crate::resources::configmap::build_configmap; use crate::resources::deployment::build_deployment; use crate::resources::route::build_route; use crate::resources::secret::build_db_secret; use crate::resources::service::build_service; use crate::types::{Condition, SojuBouncer, SojuBouncerStatus}; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Kube error: {0}")] Kube(#[source] kube::Error), #[error("Finalizer error: {0}")] Finalizer(#[source] Box>), #[error("Database error: {0}")] Db(String), #[error("Missing field: {0}")] MissingField(String), #[error("Serialization error: {0}")] Serialization(#[source] serde_json::Error), } pub struct Context { pub client: kube::Client, pub db_host: String, pub db_port: u16, pub db_master_uri: String, } static FINALIZER_NAME: &str = "irc.josie.cloud/db-cleanup"; static FIELD_MANAGER: &str = "soju-operator"; pub async fn reconcile( bouncer: Arc, ctx: Arc, ) -> Result { let client = &ctx.client; let ns = bouncer .metadata .namespace .as_deref() .ok_or_else(|| Error::MissingField("metadata.namespace".to_string()))?; let api: Api = Api::namespaced(client.clone(), ns); finalizer(&api, FINALIZER_NAME, bouncer, |event| async { match event { Event::Apply(bouncer) => reconcile_apply(&bouncer, &ctx).await, Event::Cleanup(bouncer) => reconcile_cleanup(&bouncer, &ctx).await, } }) .await .map_err(|e| Error::Finalizer(Box::new(e))) } async fn reconcile_apply(bouncer: &SojuBouncer, ctx: &Context) -> Result { let client = &ctx.client; let ns = bouncer .metadata .namespace .as_deref() .ok_or_else(|| Error::MissingField("metadata.namespace".to_string()))?; let name = bouncer .metadata .name .as_deref() .ok_or_else(|| Error::MissingField("metadata.name".to_string()))?; tracing::info!(bouncer = name, namespace = ns, "reconciling"); let password = get_or_create_password(client, ns, name).await?; let tenant_uri = db::build_tenant_uri(&ctx.db_host, ctx.db_port, name, &password); let db_client = connect_db(&ctx.db_master_uri).await?; db::provision_tenant_db(&db_client, name, &password) .await .map_err(|e| Error::Db(e.to_string()))?; let secret = build_db_secret(bouncer, &tenant_uri); let secret_api: Api = Api::namespaced(client.clone(), ns); let secret_name = secret.metadata.name.as_deref().unwrap(); secret_api .patch( secret_name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&secret), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied db secret"); if bouncer.spec.tls.is_some() { let cert = build_certificate(bouncer); let cert_api: Api = Api::namespaced(client.clone(), ns); let cert_name = cert.metadata.name.as_deref().unwrap(); cert_api .patch( cert_name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&cert), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied certificate"); } let cm = build_configmap(bouncer, &tenant_uri); let cm_api: Api = Api::namespaced(client.clone(), ns); let cm_name = cm.metadata.name.as_deref().unwrap(); cm_api .patch( cm_name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&cm), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied configmap"); let dep = build_deployment(bouncer); let dep_api: Api = Api::namespaced(client.clone(), ns); dep_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&dep), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied deployment"); let svc = build_service(bouncer); let svc_api: Api = Api::namespaced(client.clone(), ns); svc_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&svc), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied service"); let route = build_route(bouncer); let gvk = GroupVersionKind::gvk("route.openshift.io", "v1", "Route"); let ar = ApiResource::from_gvk_with_plural(&gvk, "routes"); let route_api: Api = Api::namespaced_with(client.clone(), ns, &ar); let patch_value = serde_json::json!({ "apiVersion": "route.openshift.io/v1", "kind": "Route", "metadata": route.metadata, "spec": route.spec, }); route_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&patch_value), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied route"); update_status(client, bouncer, true, "all resources applied").await?; tracing::info!(bouncer = name, "reconciled"); Ok(Action::requeue(Duration::from_secs(300))) } async fn reconcile_cleanup(bouncer: &SojuBouncer, ctx: &Context) -> Result { let name = bouncer .metadata .name .as_deref() .ok_or_else(|| Error::MissingField("metadata.name".to_string()))?; tracing::info!(bouncer = name, "cleaning up"); let db_client = connect_db(&ctx.db_master_uri).await?; db::deprovision_tenant_db(&db_client, name) .await .map_err(|e| Error::Db(e.to_string()))?; tracing::info!(bouncer = name, "deprovisioned tenant database"); Ok(Action::await_change()) } pub fn error_policy( _bouncer: Arc, error: &Error, _ctx: Arc, ) -> Action { tracing::warn!(%error, "reconcile failed"); Action::requeue(Duration::from_secs(60)) } async fn get_or_create_password( client: &kube::Client, ns: &str, bouncer_name: &str, ) -> Result { let secret_api: Api = Api::namespaced(client.clone(), ns); let secret_name = format!("{bouncer_name}-db"); match secret_api.get_opt(&secret_name).await.map_err(Error::Kube)? { Some(existing) => { if let Some(data) = &existing.data { if let Some(uri_bytes) = data.get("uri") { let uri = String::from_utf8_lossy(&uri_bytes.0); if let Some(pw) = extract_password_from_uri(&uri) { return Ok(pw); } } } if let Some(string_data) = &existing.string_data { if let Some(uri) = string_data.get("uri") { if let Some(pw) = extract_password_from_uri(uri) { return Ok(pw); } } } Ok(db::generate_password()) } None => Ok(db::generate_password()), } } fn extract_password_from_uri(uri: &str) -> Option { for part in uri.split_whitespace() { if let Some(pw) = part.strip_prefix("password=") { return Some(pw.to_string()); } } None } async fn update_status( client: &kube::Client, bouncer: &SojuBouncer, ready: bool, message: &str, ) -> Result<(), Error> { let ns = bouncer.metadata.namespace.as_deref().unwrap_or("default"); let api = Api::::namespaced(client.clone(), ns); let name = bouncer.metadata.name.as_deref().unwrap(); let now = chrono::Utc::now().to_rfc3339(); let status = SojuBouncerStatus { conditions: vec![Condition { type_: "Ready".to_string(), status: if ready { "True" } else { "False" }.to_string(), reason: if ready { "Reconciled" } else { "Error" }.to_string(), message: message.to_string(), last_transition_time: now, }], observed_generation: bouncer.metadata.generation, }; let patch = serde_json::json!({ "apiVersion": "irc.josie.cloud/v1alpha1", "kind": "SojuBouncer", "status": status, }); api.patch_status(name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&patch)) .await .map_err(Error::Kube)?; Ok(()) } async fn connect_db(uri: &str) -> Result { let (client, connection) = tokio_postgres::connect(uri, tokio_postgres::NoTls) .await .map_err(|e| Error::Db(e.to_string()))?; tokio::spawn(async move { if let Err(e) = connection.await { tracing::error!("db connection error: {}", e); } }); Ok(client) }