use kube::Api; use metrics::{counter, gauge}; use sqlx::PgPool; use crate::k8s::SojuBouncer; pub fn login_counter() { counter!("irc_now_logins_total").increment(1); } pub async fn record_stripe_metrics(stripe_client: stripe::Client) { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); loop { interval.tick().await; let mut params = stripe::ListSubscriptions::new(); params.status = Some(stripe::SubscriptionStatusFilter::Active); params.limit = Some(100); match stripe::Subscription::list(&stripe_client, ¶ms).await { Ok(list) => { let active = list.data.len() as f64; let mut mrr_cents: i64 = 0; for sub in &list.data { for item in &sub.items.data { if let Some(ref price) = item.price { let unit = price.unit_amount.unwrap_or(0); let qty = item.quantity.unwrap_or(1) as i64; mrr_cents += unit * qty; } } } gauge!("irc_now_mrr_cents").set(mrr_cents as f64); gauge!("irc_now_subscriptions_active").set(active); } Err(e) => { tracing::warn!("stripe subscription list failed: {e}"); } } } } pub async fn record_event_metrics(db: PgPool) { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); loop { interval.tick().await; if let Ok(count) = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM events WHERE event_type = 'signup' AND created_at > NOW() - INTERVAL '7 days'", ) .fetch_one(&db) .await { gauge!("irc_now_signups_7d").set(count as f64); } let upgrades_30d: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM events WHERE event_type = 'plan_upgrade' AND created_at > NOW() - INTERVAL '30 days'", ) .fetch_one(&db) .await .unwrap_or(0); let signups_30d: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM events WHERE event_type = 'signup' AND created_at > NOW() - INTERVAL '30 days'", ) .fetch_one(&db) .await .unwrap_or(0); let conversion_rate = if signups_30d > 0 { upgrades_30d as f64 / signups_30d as f64 } else { 0.0 }; gauge!("irc_now_conversion_rate_30d").set(conversion_rate); let downgrades_30d: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM events WHERE event_type = 'plan_downgrade' AND created_at > NOW() - INTERVAL '30 days'", ) .fetch_one(&db) .await .unwrap_or(0); let pro_users: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM users WHERE plan = 'pro'", ) .fetch_one(&db) .await .unwrap_or(0); let churn_rate = if pro_users > 0 { downgrades_30d as f64 / pro_users as f64 } else { 0.0 }; gauge!("irc_now_churn_rate_30d").set(churn_rate); if let Ok(total) = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM users", ) .fetch_one(&db) .await { gauge!("irc_now_users_total").set(total as f64); } if let Ok(rows) = sqlx::query_as::<_, (String, i64)>( "SELECT plan, COUNT(*) FROM users GROUP BY plan", ) .fetch_all(&db) .await { for (plan, count) in rows { gauge!("irc_now_users_by_plan", "plan" => plan).set(count as f64); } } if let Ok(count) = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM events WHERE event_type = 'signup' AND created_at > NOW() - INTERVAL '30 days'", ) .fetch_one(&db) .await { gauge!("irc_now_funnel_signups_30d").set(count as f64); } if let Ok(count) = sqlx::query_scalar::<_, i64>( "SELECT COUNT(DISTINCT s.user_sub) FROM events s \ JOIN events bc ON s.user_sub = bc.user_sub AND bc.event_type = 'bouncer_create' \ WHERE s.event_type = 'signup' AND s.created_at > NOW() - INTERVAL '30 days' \ AND bc.created_at >= s.created_at", ) .fetch_one(&db) .await { gauge!("irc_now_funnel_bouncer_created_30d").set(count as f64); } if let Ok(count) = sqlx::query_scalar::<_, i64>( "SELECT COUNT(DISTINCT s.user_sub) FROM events s \ WHERE s.event_type = 'signup' AND s.created_at > NOW() - INTERVAL '30 days' \ AND EXISTS ( \ SELECT 1 FROM events e2 WHERE e2.user_sub = s.user_sub \ AND e2.event_type = 'login' AND e2.created_at > s.created_at + INTERVAL '1 day' \ )", ) .fetch_one(&db) .await { gauge!("irc_now_funnel_returned_30d").set(count as f64); } let ttfb: Option = sqlx::query_scalar( "SELECT AVG(EXTRACT(EPOCH FROM (bc.created_at - s.created_at))) \ FROM events s \ JOIN events bc ON s.user_sub = bc.user_sub AND bc.event_type = 'bouncer_create' \ WHERE s.event_type = 'signup' AND s.created_at > NOW() - INTERVAL '30 days' \ AND bc.created_at >= s.created_at", ) .fetch_one(&db) .await .unwrap_or(None); gauge!("irc_now_time_to_first_bouncer_seconds").set(ttfb.unwrap_or(0.0)); if let Ok(count) = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM users WHERE plan = 'pro' AND content_expires = false", ) .fetch_one(&db) .await { gauge!("irc_now_adoption_permanent_content").set(count as f64); } if let Ok(count) = sqlx::query_scalar::<_, i64>( "SELECT COUNT(DISTINCT user_sub) FROM events WHERE event_type = 'bouncer_create'", ) .fetch_one(&db) .await { gauge!("irc_now_adoption_bouncer_users").set(count as f64); } if let Ok(count) = sqlx::query_scalar::<_, i64>( "SELECT COUNT(DISTINCT user_sub) FROM events WHERE event_type = 'network_create'", ) .fetch_one(&db) .await { gauge!("irc_now_adoption_network_users").set(count as f64); } } } pub async fn record_soju_metrics(kube: kube::Client, namespace: String) { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); loop { interval.tick().await; let api: Api = Api::namespaced(kube.clone(), &namespace); let bouncers = match api.list(&Default::default()).await { Ok(list) => list.items, Err(e) => { tracing::warn!("failed to list SojuBouncer CRs: {e}"); continue; } }; let mut total_networks: i64 = 0; let mut total_channels: i64 = 0; let mut total_messages: i64 = 0; let mut total_active: i64 = 0; let mut total_connected_5m: i64 = 0; for bouncer in &bouncers { let name = match &bouncer.metadata.name { Some(n) => n.clone(), None => continue, }; let secret_name = format!("{name}-db"); let secrets: kube::Api = kube::Api::namespaced(kube.clone(), &namespace); let secret = match secrets.get(&secret_name).await { Ok(s) => s, Err(e) => { tracing::debug!("no db secret for bouncer {name}: {e}"); continue; } }; let uri = match secret .data .as_ref() .and_then(|d| d.get("uri")) .and_then(|v| String::from_utf8(v.0.clone()).ok()) { Some(u) => u, None => continue, }; let (client, connection) = match tokio_postgres::connect(&uri, tokio_postgres::NoTls).await { Ok(c) => c, Err(e) => { tracing::debug!("failed to connect to bouncer db {name}: {e}"); continue; } }; tokio::spawn(async move { if let Err(e) = connection.await { tracing::debug!("bouncer db connection error: {e}"); } }); if let Ok(row) = client.query_one("SELECT COUNT(*) FROM \"Network\"", &[]).await { let count: i64 = row.get(0); total_networks += count; } if let Ok(row) = client.query_one("SELECT COUNT(*) FROM \"Channel\"", &[]).await { let count: i64 = row.get(0); total_channels += count; } match client.query_one("SELECT COUNT(*) FROM \"MessageTarget\"", &[]).await { Ok(row) => { let count: i64 = row.get(0); total_messages += count; } Err(e) => { tracing::debug!("MessageTarget query failed for {name}, skipping: {e}"); } } if let Ok(row) = client .query_one( "SELECT COUNT(*) FROM \"User\" WHERE downstream_interacted_at > NOW() - INTERVAL '24 hours'", &[], ) .await { let count: i64 = row.get(0); total_active += count; } if let Ok(row) = client .query_one( "SELECT COUNT(*) FROM \"User\" WHERE downstream_interacted_at > NOW() - INTERVAL '5 minutes'", &[], ) .await { let count: i64 = row.get(0); total_connected_5m += count; } if let Ok(rows) = client .query( "SELECT COALESCE(n.name, n.addr) AS label, COUNT(dr.*) \ FROM \"Network\" n \ LEFT JOIN \"DeliveryReceipt\" dr ON dr.network = n.id \ GROUP BY COALESCE(n.name, n.addr)", &[], ) .await { if rows.len() <= 50 { for row in &rows { let label: &str = row.get(0); let count: i64 = row.get(1); gauge!("irc_now_bouncer_deliveries_by_network", "network" => label.to_string()) .set(count as f64); } } else { tracing::debug!("bouncer {name}: {n} networks, skipping per-network breakdown", n = rows.len()); } } } gauge!("irc_now_bouncer_networks_total").set(total_networks as f64); gauge!("irc_now_bouncer_channels_total").set(total_channels as f64); gauge!("irc_now_bouncer_messages_total").set(total_messages as f64); gauge!("irc_now_bouncer_active_users").set(total_active as f64); gauge!("irc_now_active_users_24h").set(total_active as f64); gauge!("irc_now_bouncer_connected_users_5m").set(total_connected_5m as f64); } }