From c3df6c5beaa53f4f2c438d947fac6696e8af7052 Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Fri, 24 Jan 2025 21:21:28 +0100 Subject: [PATCH] feat(compute): Add some basic compute_ctl metrics --- compute_tools/src/http/routes/metrics.rs | 7 ++- compute_tools/src/installed_extensions.rs | 18 +------ compute_tools/src/lib.rs | 1 + compute_tools/src/metrics.rs | 63 +++++++++++++++++++++++ compute_tools/src/migration.rs | 46 ++++++++++------- compute_tools/src/spec.rs | 39 ++++++++++---- 6 files changed, 124 insertions(+), 50 deletions(-) create mode 100644 compute_tools/src/metrics.rs diff --git a/compute_tools/src/http/routes/metrics.rs b/compute_tools/src/http/routes/metrics.rs index 40d71b5de7c6..13150a75888b 100644 --- a/compute_tools/src/http/routes/metrics.rs +++ b/compute_tools/src/http/routes/metrics.rs @@ -2,17 +2,16 @@ use axum::{body::Body, response::Response}; use http::header::CONTENT_TYPE; use http::StatusCode; use metrics::proto::MetricFamily; -use metrics::Encoder; -use metrics::TextEncoder; +use metrics::{Encoder, TextEncoder}; -use crate::{http::JsonResponse, installed_extensions}; +use crate::{http::JsonResponse, metrics::collect}; /// Expose Prometheus metrics. pub(in crate::http) async fn get_metrics() -> Response { // When we call TextEncoder::encode() below, it will immediately return an // error if a metric family has no metrics, so we need to preemptively // filter out metric families with no metrics. - let metrics = installed_extensions::collect() + let metrics = collect() .into_iter() .filter(|m| !m.get_metric().is_empty()) .collect::>(); diff --git a/compute_tools/src/installed_extensions.rs b/compute_tools/src/installed_extensions.rs index 0ab259ddf119..173dbf40b0c2 100644 --- a/compute_tools/src/installed_extensions.rs +++ b/compute_tools/src/installed_extensions.rs @@ -1,13 +1,10 @@ use compute_api::responses::{InstalledExtension, InstalledExtensions}; -use metrics::proto::MetricFamily; use std::collections::HashMap; use anyhow::Result; use postgres::{Client, NoTls}; -use metrics::core::Collector; -use metrics::{register_uint_gauge_vec, UIntGaugeVec}; -use once_cell::sync::Lazy; +use crate::metrics::INSTALLED_EXTENSIONS; /// We don't reuse get_existing_dbs() just for code clarity /// and to make database listing query here more explicit. @@ -102,16 +99,3 @@ pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result = Lazy::new(|| { - register_uint_gauge_vec!( - "compute_installed_extensions", - "Number of databases where the version of extension is installed", - &["extension_name", "version", "owned_by_superuser"] - ) - .expect("failed to define a metric") -}); - -pub fn collect() -> Vec { - INSTALLED_EXTENSIONS.collect() -} diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 12fea4e61a09..b08df22134f3 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -16,6 +16,7 @@ pub mod extension_server; pub mod installed_extensions; pub mod local_proxy; pub mod lsn_lease; +pub mod metrics; mod migration; pub mod monitor; pub mod params; diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs new file mode 100644 index 000000000000..bbc368121892 --- /dev/null +++ b/compute_tools/src/metrics.rs @@ -0,0 +1,63 @@ +use metrics::core::Collector; +use metrics::proto::MetricFamily; +use metrics::{register_int_counter_vec, register_uint_gauge_vec, IntCounterVec, UIntGaugeVec}; +use once_cell::sync::Lazy; + +pub(crate) static INSTALLED_EXTENSIONS: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "compute_installed_extensions", + "Number of databases where the version of extension is installed", + &["extension_name", "version", "owned_by_superuser"] + ) + .expect("failed to define a metric") +}); + +pub enum CPlaneRequestMethod { + GetSpec, +} + +impl CPlaneRequestMethod { + pub fn as_str(&self) -> &str { + match self { + CPlaneRequestMethod::GetSpec => "GetSpec", + } + } +} + +pub(crate) static CPLANE_REQUESTS_TOTAL: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "compute_ctl_cplane_requests_total", + "Total number of control plane requests made by the compute_ctl", + &["method"] + ) + .expect("failed to define a metric") +}); + +pub(crate) static CPLANE_REQUESTS_FAILED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "compute_ctl_cplane_requests_failed", + "Total number of failed control plane requests made by the compute_ctl", + &["method", "http_status"] + ) + .expect("failed to define a metric") +}); + +/// Total number of failed database migrations. Per-compute, this is actually a boolean metric, +/// either empty or with a single value (1, migration_id) because we stop at the first failure. +/// Yet, the sum over the fleet will provide the total number of failures. +pub(crate) static DB_MIGRATION_FAILED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "compute_ctl_db_migration_failed", + "Total number of failed database migrations", + &["migration_id"] + ) + .expect("failed to define a metric") +}); + +pub fn collect() -> Vec { + let mut metrics = INSTALLED_EXTENSIONS.collect(); + metrics.extend(CPLANE_REQUESTS_TOTAL.collect()); + metrics.extend(CPLANE_REQUESTS_FAILED.collect()); + metrics.extend(DB_MIGRATION_FAILED.collect()); + metrics +} diff --git a/compute_tools/src/migration.rs b/compute_tools/src/migration.rs index 45c33172f7b4..aa3c6b01f035 100644 --- a/compute_tools/src/migration.rs +++ b/compute_tools/src/migration.rs @@ -1,7 +1,9 @@ use anyhow::{Context, Result}; use fail::fail_point; use postgres::{Client, Transaction}; -use tracing::info; +use tracing::{error, info}; + +use crate::metrics::DB_MIGRATION_FAILED; /// Runs a series of migrations on a target database pub(crate) struct MigrationRunner<'m> { @@ -78,24 +80,31 @@ impl<'m> MigrationRunner<'m> { Ok(()) } - /// Run an individual migration - fn run_migration(txn: &mut Transaction, migration_id: i64, migration: &str) -> Result<()> { + /// Run an individual migration in a separate transaction block. + fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> { + let mut txn = client + .transaction() + .with_context(|| format!("begin transaction for migration {migration_id}"))?; + if migration.starts_with("-- SKIP") { info!("Skipping migration id={}", migration_id); // Even though we are skipping the migration, updating the // migration ID should help keep logic easy to understand when // trying to understand the state of a cluster. - Self::update_migration_id(txn, migration_id)?; + Self::update_migration_id(&mut txn, migration_id)?; } else { info!("Running migration id={}:\n{}\n", migration_id, migration); txn.simple_query(migration) .with_context(|| format!("apply migration {migration_id}"))?; - Self::update_migration_id(txn, migration_id)?; + Self::update_migration_id(&mut txn, migration_id)?; } + txn.commit() + .with_context(|| format!("commit transaction for migration {migration_id}"))?; + Ok(()) } @@ -109,19 +118,20 @@ impl<'m> MigrationRunner<'m> { // The index lags the migration ID by 1, so the current migration // ID is also the next index let migration_id = (current_migration + 1) as i64; - - let mut txn = self - .client - .transaction() - .with_context(|| format!("begin transaction for migration {migration_id}"))?; - - Self::run_migration(&mut txn, migration_id, self.migrations[current_migration]) - .with_context(|| format!("running migration {migration_id}"))?; - - txn.commit() - .with_context(|| format!("commit transaction for migration {migration_id}"))?; - - info!("Finished migration id={}", migration_id); + let migration = self.migrations[current_migration]; + + match Self::run_migration(self.client, migration_id, migration) { + Ok(_) => { + info!("Finished migration id={}", migration_id); + } + Err(e) => { + error!("Failed to run migration id={}: {}", migration_id, e); + DB_MIGRATION_FAILED + .with_label_values(&[migration_id.to_string().as_str()]) + .inc(); + return Err(e); + } + } current_migration += 1; } diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index c7d2deb0900f..f06c8ef99f7e 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -6,6 +6,7 @@ use std::path::Path; use tracing::{error, info, instrument, warn}; use crate::config; +use crate::metrics::{CPlaneRequestMethod::GetSpec, CPLANE_REQUESTS_FAILED, CPLANE_REQUESTS_TOTAL}; use crate::migration::MigrationRunner; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; @@ -19,7 +20,7 @@ use compute_api::spec::ComputeSpec; fn do_control_plane_request( uri: &str, jwt: &str, -) -> Result { +) -> Result)> { let resp = reqwest::blocking::Client::new() .get(uri) .header("Authorization", format!("Bearer {}", jwt)) @@ -28,34 +29,41 @@ fn do_control_plane_request( ( true, format!("could not perform spec request to control plane: {}", e), + None, ) })?; - match resp.status() { + let status = resp.status(); + match status { StatusCode::OK => match resp.json::() { Ok(spec_resp) => Ok(spec_resp), Err(e) => Err(( true, format!("could not deserialize control plane response: {}", e), + Some(status), )), }, - StatusCode::SERVICE_UNAVAILABLE => { - Err((true, "control plane is temporarily unavailable".to_string())) - } + StatusCode::SERVICE_UNAVAILABLE => Err(( + true, + "control plane is temporarily unavailable".to_string(), + Some(status), + )), StatusCode::BAD_GATEWAY => { // We have a problem with intermittent 502 errors now // https://github.com/neondatabase/cloud/issues/2353 // It's fine to retry GET request in this case. - Err((true, "control plane request failed with 502".to_string())) + Err(( + true, + "control plane request failed with 502".to_string(), + Some(status), + )) } // Another code, likely 500 or 404, means that compute is unknown to the control plane // or some internal failure happened. Doesn't make much sense to retry in this case. _ => Err(( false, - format!( - "unexpected control plane response status code: {}", - resp.status() - ), + format!("unexpected control plane response status code: {}", status), + Some(status), )), } } @@ -82,6 +90,9 @@ pub fn get_spec_from_control_plane( // - no spec for compute yet (Empty state) -> return Ok(None) // - got spec -> return Ok(Some(spec)) while attempt < 4 { + CPLANE_REQUESTS_TOTAL + .with_label_values(&[GetSpec.as_str()]) + .inc(); spec = match do_control_plane_request(&cp_uri, &jwt) { Ok(spec_resp) => match spec_resp.status { ControlPlaneComputeStatus::Empty => Ok(None), @@ -93,7 +104,13 @@ pub fn get_spec_from_control_plane( } } }, - Err((retry, msg)) => { + Err((retry, msg, status)) => { + let status_str = status + .map(|s| s.to_string()) + .unwrap_or("unknown".to_string()); + CPLANE_REQUESTS_FAILED + .with_label_values(&[GetSpec.as_str(), &status_str]) + .inc(); if retry { Err(anyhow!(msg)) } else {