use std::sync::Arc; use std::time::Duration; use k8s_openapi::api::apps::v1::Deployment; use k8s_openapi::api::core::v1::{ConfigMap, Secret, Service}; use k8s_crds_cert_manager::certificates::Certificate; use kube::api::{Api, DynamicObject, Patch, PatchParams}; use kube::core::{ApiResource, GroupVersionKind}; use kube::runtime::controller::Action; use kube::runtime::finalizer::{finalizer, Event}; use crate::db; use crate::resources::certificate::build_certificate; use crate::resources::configmap::build_configmap; use crate::resources::deployment::build_deployment; use crate::resources::route::{build_edge_route, build_route}; use crate::resources::secret::build_db_secret; use crate::resources::service::build_service; use crate::types::{Condition, SojuBouncer, SojuBouncerStatus}; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Kube error: {0}")] Kube(#[source] kube::Error), #[error("Finalizer error: {0}")] Finalizer(#[source] Box>), #[error("Database error: {0}")] Db(String), #[error("Missing field: {0}")] MissingField(String), #[error("Serialization error: {0}")] Serialization(#[source] serde_json::Error), } pub struct Context { pub client: kube::Client, pub db_host: String, pub db_port: u16, pub db_pool: deadpool_postgres::Pool, } static FINALIZER_NAME: &str = "irc.now/db-cleanup"; static FIELD_MANAGER: &str = "soju-operator"; pub async fn reconcile( bouncer: Arc, ctx: Arc, ) -> Result { let client = &ctx.client; let ns = bouncer .metadata .namespace .as_deref() .ok_or_else(|| Error::MissingField("metadata.namespace".to_string()))?; let api: Api = Api::namespaced(client.clone(), ns); finalizer(&api, FINALIZER_NAME, bouncer, |event| async { match event { Event::Apply(bouncer) => { let result = reconcile_apply(&bouncer, &ctx).await; if let Err(ref e) = result { let _ = update_status(&ctx.client, &bouncer, false, &e.to_string()).await; } result } Event::Cleanup(bouncer) => reconcile_cleanup(&bouncer, &ctx).await, } }) .await .map_err(|e| Error::Finalizer(Box::new(e))) } async fn reconcile_apply(bouncer: &SojuBouncer, ctx: &Context) -> Result { let client = &ctx.client; let ns = bouncer .metadata .namespace .as_deref() .ok_or_else(|| Error::MissingField("metadata.namespace".to_string()))?; let name = bouncer .metadata .name .as_deref() .ok_or_else(|| Error::MissingField("metadata.name".to_string()))?; tracing::info!(bouncer = name, namespace = ns, "reconciling"); let password = get_or_create_password(client, ns, name).await?; let tenant_uri = db::build_tenant_uri(&ctx.db_host, ctx.db_port, name, &password); let db_client = ctx.db_pool.get().await.map_err(|e| Error::Db(e.to_string()))?; db::provision_tenant_db(&db_client, name, &password) .await .map_err(|e| Error::Db(e.to_string()))?; let secret = build_db_secret(bouncer, &tenant_uri, &password); let secret_api: Api = Api::namespaced(client.clone(), ns); let secret_name = secret.metadata.name.as_deref().unwrap(); secret_api .patch( secret_name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&secret), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied db secret"); if bouncer.spec.tls.is_some() { let cert = build_certificate(bouncer); let cert_api: Api = Api::namespaced(client.clone(), ns); let cert_name = cert.metadata.name.as_deref().unwrap(); cert_api .patch( cert_name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&cert), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied certificate"); } let auth_url = resolve_auth_url(client, ns, bouncer).await?; let cm = build_configmap(bouncer, &tenant_uri, auth_url.as_deref()); let cm_api: Api = Api::namespaced(client.clone(), ns); let cm_name = cm.metadata.name.as_deref().unwrap(); cm_api .patch( cm_name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&cm), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied configmap"); let config_data = cm.data.as_ref().and_then(|d| d.get("config")).cloned().unwrap_or_default(); let config_hash = { use sha2::{Digest, Sha256}; let hash = Sha256::digest(config_data.as_bytes()); format!("{:x}", hash) }; let dep = build_deployment(bouncer, &config_hash); let dep_api: Api = Api::namespaced(client.clone(), ns); dep_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&dep), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied deployment"); let svc = build_service(bouncer); let svc_api: Api = Api::namespaced(client.clone(), ns); svc_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&svc), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied service"); if bouncer.spec.route.is_some() { let route = build_route(bouncer); let gvk = GroupVersionKind::gvk("route.openshift.io", "v1", "Route"); let ar = ApiResource::from_gvk_with_plural(&gvk, "routes"); let route_api: Api = Api::namespaced_with(client.clone(), ns, &ar); let patch_value = serde_json::json!({ "apiVersion": "route.openshift.io/v1", "kind": "Route", "metadata": route.metadata, "spec": route.spec, }); route_api .patch( name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&patch_value), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied route"); } if name != "soju-shared" { let edge_route = build_edge_route(bouncer); let gvk = GroupVersionKind::gvk("route.openshift.io", "v1", "Route"); let ar = ApiResource::from_gvk_with_plural(&gvk, "routes"); let route_api: Api = Api::namespaced_with(client.clone(), ns, &ar); let edge_route_name = edge_route.metadata.name.as_deref().unwrap(); let patch_value = serde_json::json!({ "apiVersion": "route.openshift.io/v1", "kind": "Route", "metadata": edge_route.metadata, "spec": edge_route.spec, }); route_api .patch( edge_route_name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&patch_value), ) .await .map_err(Error::Kube)?; tracing::info!(bouncer = name, "applied edge route"); } update_status(client, bouncer, true, "all resources applied").await?; tracing::info!(bouncer = name, "reconciled"); Ok(Action::requeue(Duration::from_secs(300))) } async fn reconcile_cleanup(bouncer: &SojuBouncer, ctx: &Context) -> Result { let name = bouncer .metadata .name .as_deref() .ok_or_else(|| Error::MissingField("metadata.name".to_string()))?; tracing::info!(bouncer = name, "cleaning up"); let db_client = ctx.db_pool.get().await.map_err(|e| Error::Db(e.to_string()))?; db::deprovision_tenant_db(&db_client, name) .await .map_err(|e| Error::Db(e.to_string()))?; tracing::info!(bouncer = name, "deprovisioned tenant database"); Ok(Action::await_change()) } pub fn error_policy( _bouncer: Arc, error: &Error, _ctx: Arc, ) -> Action { tracing::warn!(%error, "reconcile failed"); Action::requeue(Duration::from_secs(60)) } async fn get_or_create_password( client: &kube::Client, ns: &str, bouncer_name: &str, ) -> Result { let secret_api: Api = Api::namespaced(client.clone(), ns); let secret_name = format!("{bouncer_name}-db"); match secret_api.get_opt(&secret_name).await.map_err(Error::Kube)? { Some(existing) => { if let Some(string_data) = &existing.string_data { if let Some(pw) = string_data.get("password") { return Ok(pw.clone()); } } if let Some(data) = &existing.data { if let Some(pw_bytes) = data.get("password") { return Ok(String::from_utf8_lossy(&pw_bytes.0).to_string()); } } Err(Error::MissingField("password key not found in existing db secret".to_string())) } None => Ok(db::generate_password()), } } async fn resolve_auth_url( client: &kube::Client, ns: &str, bouncer: &SojuBouncer, ) -> Result, Error> { let auth = match &bouncer.spec.auth { Some(a) => a, None => return Ok(None), }; if let Some(secret_name) = &auth.secret_ref { let secret_api: Api = Api::namespaced(client.clone(), ns); let secret = secret_api.get(secret_name).await.map_err(Error::Kube)?; let data = secret.data.as_ref().ok_or_else(|| { Error::MissingField(format!("secret {secret_name} has no data")) })?; let client_id = data .get("client-id") .map(|v| String::from_utf8_lossy(&v.0).to_string()) .unwrap_or_default(); let client_secret = data .get("client-secret") .map(|v| String::from_utf8_lossy(&v.0).to_string()) .unwrap_or_default(); let mut auth_url = url::Url::parse(&auth.oauth2_url).map_err(|e| { Error::MissingField(format!("invalid oauth2 url: {e}")) })?; auth_url .set_username(&client_id) .map_err(|_| Error::MissingField("cannot set username".to_string()))?; auth_url .set_password(Some(&client_secret)) .map_err(|_| Error::MissingField("cannot set password".to_string()))?; Ok(Some(auth_url.to_string())) } else { Ok(Some(auth.oauth2_url.clone())) } } async fn update_status( client: &kube::Client, bouncer: &SojuBouncer, ready: bool, message: &str, ) -> Result<(), Error> { let ns = bouncer.metadata.namespace.as_deref().unwrap_or("default"); let api = Api::::namespaced(client.clone(), ns); let name = bouncer.metadata.name.as_deref().unwrap(); let now = chrono::Utc::now().to_rfc3339(); let status = SojuBouncerStatus { conditions: vec![Condition { type_: "Ready".to_string(), status: if ready { "True" } else { "False" }.to_string(), reason: if ready { "Reconciled" } else { "Error" }.to_string(), message: message.to_string(), last_transition_time: now, }], observed_generation: bouncer.metadata.generation, }; let patch = serde_json::json!({ "apiVersion": "irc.now/v1alpha1", "kind": "SojuBouncer", "status": status, }); api.patch_status(name, &PatchParams::apply(FIELD_MANAGER).force(), &Patch::Apply(&patch)) .await .map_err(Error::Kube)?; Ok(()) }