use futures::TryStreamExt; use k8s_openapi::api::core::v1::Secret; use kube::api::{Api, Patch, PatchParams}; use kube::runtime::{watcher, WatchStreamExt}; use serde_json::json; use crate::ergo_admin; use crate::k8s::SojuBouncer; use crate::state::AppState; const ANNOTATION_KEY: &str = "irc.now/upstream-configured"; pub async fn watch_bouncers(state: AppState) { loop { tracing::info!("starting bouncer watcher"); let api: Api = Api::namespaced(state.kube.clone(), &state.namespace); let result = watcher(api, watcher::Config::default()) .applied_objects() .default_backoff() .try_for_each(|bouncer| { let state = state.clone(); async move { if let Err(e) = handle_bouncer(&state, &bouncer).await { tracing::warn!( bouncer = ?bouncer.metadata.name, "upstream setup failed: {e}" ); } Ok(()) } }) .await; if let Err(e) = result { tracing::warn!("bouncer watcher stream ended: {e}"); } tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } async fn handle_bouncer(state: &AppState, bouncer: &SojuBouncer) -> Result<(), String> { let name = bouncer .metadata .name .as_deref() .ok_or("bouncer has no name")?; let ready = bouncer .status .as_ref() .and_then(|s| s.conditions.first()) .map(|c| c.type_ == "Ready" && c.status == "True") .unwrap_or(false); if !ready { return Ok(()); } let already_configured = bouncer .metadata .annotations .as_ref() .map(|a| a.contains_key(ANNOTATION_KEY)) .unwrap_or(false); if already_configured { return Ok(()); } tracing::info!("configuring upstream for bouncer '{name}'"); let secret_api: Api = Api::namespaced(state.kube.clone(), &state.namespace); let secret = secret_api .get(&format!("{name}-db")) .await .map_err(|e| format!("failed to get db secret: {e}"))?; let uri = secret .data .as_ref() .and_then(|d| d.get("uri")) .map(|b| String::from_utf8_lossy(&b.0).to_string()) .ok_or("missing uri in db secret")?; let (client, connection) = tokio_postgres::connect(&uri, tokio_postgres::NoTls) .await .map_err(|e| format!("soju db connect failed: {e}"))?; tokio::spawn(async move { if let Err(e) = connection.await { tracing::warn!("soju db connection error: {e}"); } }); let mut retries = 0; let username: String = loop { match client .query_opt(r#"SELECT username FROM "User" LIMIT 1"#, &[]) .await .map_err(|e| format!("soju db query failed: {e}"))? { Some(row) => break row.get(0), None if retries < 10 => { retries += 1; tokio::time::sleep(std::time::Duration::from_secs(2)).await; } None => return Err("soju User table empty after retries".to_string()), } }; let ergo_password = ergo_admin::generate_password(); ergo_admin::ensure_ergo_account(&state.kube, &state.namespace, &username, &ergo_password) .await?; client .execute( r#"INSERT INTO "Network" ("user", name, addr, nick, enabled, sasl_mechanism, sasl_plain_username, sasl_plain_password) SELECT id, 'irc.now', 'irc+insecure://irc-now-net:6667', username, true, 'PLAIN', username, $1 FROM "User" LIMIT 1 ON CONFLICT (name, "user") DO UPDATE SET sasl_mechanism = 'PLAIN', sasl_plain_username = EXCLUDED.sasl_plain_username, sasl_plain_password = EXCLUDED.sasl_plain_password"#, &[&ergo_password], ) .await .map_err(|e| format!("network upsert failed: {e}"))?; let bouncer_api: Api = Api::namespaced(state.kube.clone(), &state.namespace); bouncer_api .patch( name, &PatchParams::apply("web-api"), &Patch::Merge(json!({ "metadata": { "annotations": { ANNOTATION_KEY: "true" } } })), ) .await .map_err(|e| format!("failed to annotate bouncer: {e}"))?; tracing::info!("upstream configured for bouncer '{name}' (user: {username})"); Ok(()) }