use k8s_openapi::api::core::v1::Secret; use kube::Api; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; const ERGO_HOST: &str = "irc-now-net"; const ERGO_PORT: u16 = 6667; const OPER_SECRET_NAME: &str = "irc-now-net-oper"; const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); async fn read_oper_password(kube: &kube::Client, namespace: &str) -> Result { let api: Api = Api::namespaced(kube.clone(), namespace); let secret = api .get(OPER_SECRET_NAME) .await .map_err(|e| format!("failed to get oper secret: {e}"))?; secret .data .as_ref() .and_then(|d| d.get("password")) .map(|b| String::from_utf8_lossy(&b.0).trim().to_string()) .ok_or_else(|| "missing 'password' key in oper secret".to_string()) } pub async fn register_ergo_account( kube: &kube::Client, namespace: &str, username: &str, password: &str, ) -> Result<(), String> { let oper_password = read_oper_password(kube, namespace).await?; tokio::time::timeout(TIMEOUT, async { let stream = TcpStream::connect((ERGO_HOST, ERGO_PORT)) .await .map_err(|e| format!("connect to ergo failed: {e}"))?; let (reader, mut writer) = stream.into_split(); let mut lines = BufReader::new(reader).lines(); writer .write_all(b"NICK ergo-admin\r\nUSER ergo-admin 0 * :admin\r\n") .await .map_err(|e| format!("write failed: {e}"))?; loop { let line = lines .next_line() .await .map_err(|e| format!("read failed: {e}"))? .ok_or("connection closed before welcome")?; if line.starts_with("PING") { let token = line.splitn(2, ' ').nth(1).unwrap_or(""); writer .write_all(format!("PONG {token}\r\n").as_bytes()) .await .map_err(|e| format!("write failed: {e}"))?; } if line.contains(" 001 ") { break; } } writer .write_all(format!("OPER admin {oper_password}\r\n").as_bytes()) .await .map_err(|e| format!("write failed: {e}"))?; loop { let line = lines .next_line() .await .map_err(|e| format!("read failed: {e}"))? .ok_or("connection closed before oper reply")?; if line.starts_with("PING") { let token = line.splitn(2, ' ').nth(1).unwrap_or(""); writer .write_all(format!("PONG {token}\r\n").as_bytes()) .await .map_err(|e| format!("write failed: {e}"))?; } if line.contains(" 381 ") { break; } if line.contains(" 491 ") || line.contains("ERR_NOOPERHOST") { return Err("OPER authentication failed".to_string()); } } writer .write_all( format!("NS SAREGISTER {username} {password}\r\n").as_bytes(), ) .await .map_err(|e| format!("write failed: {e}"))?; loop { let line = lines .next_line() .await .map_err(|e| format!("read failed: {e}"))? .ok_or("connection closed before SAREGISTER reply")?; if line.starts_with("PING") { let token = line.splitn(2, ' ').nth(1).unwrap_or(""); writer .write_all(format!("PONG {token}\r\n").as_bytes()) .await .map_err(|e| format!("write failed: {e}"))?; } if line.contains("NickServ") || line.contains("nickserv") { let _ = writer.write_all(b"QUIT\r\n").await; let lower = line.to_lowercase(); if lower.contains("account already exists") { tracing::info!("ergo account '{username}' already exists"); return Ok(()); } if lower.contains("successfully registered") || lower.contains("registered account") { tracing::info!("registered ergo account '{username}'"); return Ok(()); } if lower.contains("error") || lower.contains("fail") { return Err(format!("SAREGISTER failed: {line}")); } tracing::info!("SAREGISTER response: {line}"); return Ok(()); } } }) .await .map_err(|_| "ergo admin connection timed out".to_string())? } pub async fn backfill_sasl_if_needed( kube: &kube::Client, namespace: &str, user_sub: &str, ) { use kube::api::ListParams; let bouncer_api: Api = Api::namespaced(kube.clone(), namespace); let lp = ListParams::default().labels(&format!("irc.now/owner={user_sub}")); let bouncers = match bouncer_api.list(&lp).await { Ok(list) => list.items, Err(_) => return, }; let secret_api: Api = Api::namespaced(kube.clone(), namespace); for bouncer in bouncers { let name = match bouncer.metadata.name { Some(ref n) => n.clone(), None => continue, }; let secret = match secret_api.get(&format!("{name}-db")).await { Ok(s) => s, Err(_) => continue, }; let uri = match secret .data .as_ref() .and_then(|d| d.get("uri")) .map(|b| String::from_utf8_lossy(&b.0).to_string()) { Some(u) => u, None => continue, }; let (client, connection) = match tokio_postgres::connect(&uri, tokio_postgres::NoTls).await { Ok(c) => c, Err(_) => continue, }; tokio::spawn(async move { if let Err(e) = connection.await { tracing::warn!("soju db connection error: {e}"); } }); let row = match client .query_opt( r#"SELECT "user", sasl_mechanism, n.name as net_name, u.username FROM "Network" n JOIN "User" u ON n."user" = u.id WHERE n.name = 'irc.now' LIMIT 1"#, &[], ) .await { Ok(Some(row)) => row, _ => continue, }; let sasl: Option = row.get(1); if sasl.is_some() { continue; } let user_id: i64 = row.get(0); let username: String = row.get(3); let ergo_password = generate_password(); if let Err(e) = register_ergo_account(kube, namespace, &username, &ergo_password).await { tracing::warn!("ergo backfill registration failed for {username}: {e}"); continue; } if let Err(e) = client .execute( r#"UPDATE "Network" SET sasl_mechanism = 'PLAIN', sasl_plain_username = $1, sasl_plain_password = $2 WHERE "user" = $3 AND name = 'irc.now'"#, &[&username, &ergo_password, &user_id], ) .await { tracing::warn!("failed to update SASL for {username}: {e}"); } else { tracing::info!("backfilled SASL credentials for {username}"); } } } pub fn generate_password() -> String { use rand::Rng; const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; let mut rng = rand::thread_rng(); (0..24) .map(|_| { let idx = rng.gen_range(0..CHARSET.len()); CHARSET[idx] as char }) .collect() }