From de78bcdfb1792a6872f9acbc938d7effdc3bf63b Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Fri, 24 Jan 2025 21:21:28 +0100 Subject: [PATCH] feat(compute): Add some basic compute_ctl metrics --- compute_tools/src/extension_server.rs | 65 +++++++++++--- compute_tools/src/http/routes/metrics.rs | 7 +- compute_tools/src/installed_extensions.rs | 18 +--- compute_tools/src/lib.rs | 1 + compute_tools/src/metrics.rs | 90 +++++++++++++++++++ compute_tools/src/migration.rs | 46 ++++++---- compute_tools/src/spec.rs | 41 ++++++--- .../regress/test_compute_migrations.py | 13 ++- .../regress/test_download_extensions.py | 14 ++- 9 files changed, 231 insertions(+), 64 deletions(-) create mode 100644 compute_tools/src/metrics.rs diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index f13b2308e70e..fa638c74b3b5 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -85,6 +85,8 @@ use tracing::info; use tracing::log::warn; use zstd::stream::read::Decoder; +use crate::metrics::{REMOTE_EXT_REQUESTS_FAILED, REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS}; + fn get_pg_config(argument: &str, pgbin: &str) -> String { // gives the result of `pg_config [argument]` // where argument is a flag like `--version` or `--sharedir` @@ -258,21 +260,60 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res info!("Download extension {:?} from uri {:?}", ext_path, uri); - let resp = reqwest::get(uri).await?; + REMOTE_EXT_REQUESTS_TOTAL.with_label_values(&[]).inc(); + + match do_extension_server_request(&uri).await { + Ok(resp) => { + info!( + "Successfully downloaded remote extension data {:?}", + ext_path + ); + Ok(resp) + } + Err((msg, status)) => { + let status_str = status + .map(|s| s.to_string()) + .unwrap_or(UNKNOWN_HTTP_STATUS.to_string()); + REMOTE_EXT_REQUESTS_FAILED + .with_label_values(&[&status_str]) + .inc(); + bail!(msg); + } + } +} + +// Do a single remote extensions server request. +// Return result or (error message + status code) in case of any failures. +async fn do_extension_server_request(uri: &str) -> Result)> { + let resp = reqwest::get(uri).await.map_err(|e| { + ( + format!("could not perform remote extensions server request: {}", e), + None, + ) + })?; + let status = resp.status(); - match resp.status() { + match status { StatusCode::OK => match resp.bytes().await { - Ok(resp) => { - info!("Download extension {:?} completed successfully", ext_path); - Ok(resp) - } - Err(e) => bail!("could not deserialize remote extension response: {}", e), + Ok(resp) => Ok(resp), + Err(e) => Err(( + format!("could not read remote extensions server response: {}", e), + // It's fine to return and report error with status as 200 OK, + // because we still failed to read the response. + Some(status), + )), }, - StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"), - _ => bail!( - "unexpected remote extension response status code: {}", - resp.status() - ), + StatusCode::SERVICE_UNAVAILABLE => Err(( + "remote extensions server is temporarily unavailable".to_string(), + Some(status), + )), + _ => Err(( + format!( + "unexpected remote extensions server response status code: {}", + status + ), + Some(status), + )), } } diff --git a/compute_tools/src/http/routes/metrics.rs b/compute_tools/src/http/routes/metrics.rs index 40d71b5de7c6..13150a75888b 100644 --- a/compute_tools/src/http/routes/metrics.rs +++ b/compute_tools/src/http/routes/metrics.rs @@ -2,17 +2,16 @@ use axum::{body::Body, response::Response}; use http::header::CONTENT_TYPE; use http::StatusCode; use metrics::proto::MetricFamily; -use metrics::Encoder; -use metrics::TextEncoder; +use metrics::{Encoder, TextEncoder}; -use crate::{http::JsonResponse, installed_extensions}; +use crate::{http::JsonResponse, metrics::collect}; /// Expose Prometheus metrics. pub(in crate::http) async fn get_metrics() -> Response { // When we call TextEncoder::encode() below, it will immediately return an // error if a metric family has no metrics, so we need to preemptively // filter out metric families with no metrics. - let metrics = installed_extensions::collect() + let metrics = collect() .into_iter() .filter(|m| !m.get_metric().is_empty()) .collect::>(); diff --git a/compute_tools/src/installed_extensions.rs b/compute_tools/src/installed_extensions.rs index 0ab259ddf119..173dbf40b0c2 100644 --- a/compute_tools/src/installed_extensions.rs +++ b/compute_tools/src/installed_extensions.rs @@ -1,13 +1,10 @@ use compute_api::responses::{InstalledExtension, InstalledExtensions}; -use metrics::proto::MetricFamily; use std::collections::HashMap; use anyhow::Result; use postgres::{Client, NoTls}; -use metrics::core::Collector; -use metrics::{register_uint_gauge_vec, UIntGaugeVec}; -use once_cell::sync::Lazy; +use crate::metrics::INSTALLED_EXTENSIONS; /// We don't reuse get_existing_dbs() just for code clarity /// and to make database listing query here more explicit. @@ -102,16 +99,3 @@ pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result = Lazy::new(|| { - register_uint_gauge_vec!( - "compute_installed_extensions", - "Number of databases where the version of extension is installed", - &["extension_name", "version", "owned_by_superuser"] - ) - .expect("failed to define a metric") -}); - -pub fn collect() -> Vec { - INSTALLED_EXTENSIONS.collect() -} diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 12fea4e61a09..b08df22134f3 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -16,6 +16,7 @@ pub mod extension_server; pub mod installed_extensions; pub mod local_proxy; pub mod lsn_lease; +pub mod metrics; mod migration; pub mod monitor; pub mod params; diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs new file mode 100644 index 000000000000..36843385712a --- /dev/null +++ b/compute_tools/src/metrics.rs @@ -0,0 +1,90 @@ +use metrics::core::Collector; +use metrics::proto::MetricFamily; +use metrics::{register_int_counter_vec, register_uint_gauge_vec, IntCounterVec, UIntGaugeVec}; +use once_cell::sync::Lazy; + +pub(crate) static INSTALLED_EXTENSIONS: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "compute_installed_extensions", + "Number of databases where the version of extension is installed", + &["extension_name", "version", "owned_by_superuser"] + ) + .expect("failed to define a metric") +}); + +// Normally, any HTTP API request is described by METHOD (e.g. GET, POST, etc.) + PATH, +// but for all our APIs we defined a 'slug'/method/operationId in the OpenAPI spec. +// And it's fair to call it a 'RPC' (Remote Procedure Call). +pub enum CPlaneRequestRPC { + GetSpec, +} + +impl CPlaneRequestRPC { + pub fn as_str(&self) -> &str { + match self { + CPlaneRequestRPC::GetSpec => "GetSpec", + } + } +} + +pub const UNKNOWN_HTTP_STATUS: &str = "unknown"; + +pub(crate) static CPLANE_REQUESTS_TOTAL: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "compute_ctl_cplane_requests_total", + "Total number of control plane requests made by compute_ctl", + &["rpc"] + ) + .expect("failed to define a metric") +}); + +pub(crate) static CPLANE_REQUESTS_FAILED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "compute_ctl_cplane_requests_failed_total", + "Total number of failed control plane requests made by compute_ctl", + &["rpc", "http_status"] + ) + .expect("failed to define a metric") +}); + +/// Total number of failed database migrations. Per-compute, this is actually a boolean metric, +/// either empty or with a single value (1, migration_id) because we stop at the first failure. +/// Yet, the sum over the fleet will provide the total number of failures. +pub(crate) static DB_MIGRATION_FAILED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "compute_ctl_db_migration_failed_total", + "Total number of failed database migrations", + &["migration_id"] + ) + .expect("failed to define a metric") +}); + +pub(crate) static REMOTE_EXT_REQUESTS_TOTAL: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "compute_ctl_remote_ext_requests_total", + "Total number of requests made by compute_ctl to download extensions from S3 proxy", + // Do not use any labels like extension name yet. + // We can add them later if needed. + &[] + ) + .expect("failed to define a metric") +}); + +pub(crate) static REMOTE_EXT_REQUESTS_FAILED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "compute_ctl_remote_ext_requests_failed_total", + "Total number of failed requests to S3 proxy", + &["http_status"] + ) + .expect("failed to define a metric") +}); + +pub fn collect() -> Vec { + let mut metrics = INSTALLED_EXTENSIONS.collect(); + metrics.extend(CPLANE_REQUESTS_TOTAL.collect()); + metrics.extend(CPLANE_REQUESTS_FAILED.collect()); + metrics.extend(DB_MIGRATION_FAILED.collect()); + metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect()); + metrics.extend(REMOTE_EXT_REQUESTS_FAILED.collect()); + metrics +} diff --git a/compute_tools/src/migration.rs b/compute_tools/src/migration.rs index 45c33172f7b4..aa3c6b01f035 100644 --- a/compute_tools/src/migration.rs +++ b/compute_tools/src/migration.rs @@ -1,7 +1,9 @@ use anyhow::{Context, Result}; use fail::fail_point; use postgres::{Client, Transaction}; -use tracing::info; +use tracing::{error, info}; + +use crate::metrics::DB_MIGRATION_FAILED; /// Runs a series of migrations on a target database pub(crate) struct MigrationRunner<'m> { @@ -78,24 +80,31 @@ impl<'m> MigrationRunner<'m> { Ok(()) } - /// Run an individual migration - fn run_migration(txn: &mut Transaction, migration_id: i64, migration: &str) -> Result<()> { + /// Run an individual migration in a separate transaction block. + fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> { + let mut txn = client + .transaction() + .with_context(|| format!("begin transaction for migration {migration_id}"))?; + if migration.starts_with("-- SKIP") { info!("Skipping migration id={}", migration_id); // Even though we are skipping the migration, updating the // migration ID should help keep logic easy to understand when // trying to understand the state of a cluster. - Self::update_migration_id(txn, migration_id)?; + Self::update_migration_id(&mut txn, migration_id)?; } else { info!("Running migration id={}:\n{}\n", migration_id, migration); txn.simple_query(migration) .with_context(|| format!("apply migration {migration_id}"))?; - Self::update_migration_id(txn, migration_id)?; + Self::update_migration_id(&mut txn, migration_id)?; } + txn.commit() + .with_context(|| format!("commit transaction for migration {migration_id}"))?; + Ok(()) } @@ -109,19 +118,20 @@ impl<'m> MigrationRunner<'m> { // The index lags the migration ID by 1, so the current migration // ID is also the next index let migration_id = (current_migration + 1) as i64; - - let mut txn = self - .client - .transaction() - .with_context(|| format!("begin transaction for migration {migration_id}"))?; - - Self::run_migration(&mut txn, migration_id, self.migrations[current_migration]) - .with_context(|| format!("running migration {migration_id}"))?; - - txn.commit() - .with_context(|| format!("commit transaction for migration {migration_id}"))?; - - info!("Finished migration id={}", migration_id); + let migration = self.migrations[current_migration]; + + match Self::run_migration(self.client, migration_id, migration) { + Ok(_) => { + info!("Finished migration id={}", migration_id); + } + Err(e) => { + error!("Failed to run migration id={}: {}", migration_id, e); + DB_MIGRATION_FAILED + .with_label_values(&[migration_id.to_string().as_str()]) + .inc(); + return Err(e); + } + } current_migration += 1; } diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index c7d2deb0900f..01de13811f28 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -6,6 +6,9 @@ use std::path::Path; use tracing::{error, info, instrument, warn}; use crate::config; +use crate::metrics::{ + CPlaneRequestRPC, CPLANE_REQUESTS_FAILED, CPLANE_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS, +}; use crate::migration::MigrationRunner; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; @@ -19,7 +22,7 @@ use compute_api::spec::ComputeSpec; fn do_control_plane_request( uri: &str, jwt: &str, -) -> Result { +) -> Result)> { let resp = reqwest::blocking::Client::new() .get(uri) .header("Authorization", format!("Bearer {}", jwt)) @@ -28,34 +31,41 @@ fn do_control_plane_request( ( true, format!("could not perform spec request to control plane: {}", e), + None, ) })?; - match resp.status() { + let status = resp.status(); + match status { StatusCode::OK => match resp.json::() { Ok(spec_resp) => Ok(spec_resp), Err(e) => Err(( true, format!("could not deserialize control plane response: {}", e), + Some(status), )), }, - StatusCode::SERVICE_UNAVAILABLE => { - Err((true, "control plane is temporarily unavailable".to_string())) - } + StatusCode::SERVICE_UNAVAILABLE => Err(( + true, + "control plane is temporarily unavailable".to_string(), + Some(status), + )), StatusCode::BAD_GATEWAY => { // We have a problem with intermittent 502 errors now // https://github.com/neondatabase/cloud/issues/2353 // It's fine to retry GET request in this case. - Err((true, "control plane request failed with 502".to_string())) + Err(( + true, + "control plane request failed with 502".to_string(), + Some(status), + )) } // Another code, likely 500 or 404, means that compute is unknown to the control plane // or some internal failure happened. Doesn't make much sense to retry in this case. _ => Err(( false, - format!( - "unexpected control plane response status code: {}", - resp.status() - ), + format!("unexpected control plane response status code: {}", status), + Some(status), )), } } @@ -82,6 +92,9 @@ pub fn get_spec_from_control_plane( // - no spec for compute yet (Empty state) -> return Ok(None) // - got spec -> return Ok(Some(spec)) while attempt < 4 { + CPLANE_REQUESTS_TOTAL + .with_label_values(&[CPlaneRequestRPC::GetSpec.as_str()]) + .inc(); spec = match do_control_plane_request(&cp_uri, &jwt) { Ok(spec_resp) => match spec_resp.status { ControlPlaneComputeStatus::Empty => Ok(None), @@ -93,7 +106,13 @@ pub fn get_spec_from_control_plane( } } }, - Err((retry, msg)) => { + Err((retry, msg, status)) => { + let status_str = status + .map(|s| s.to_string()) + .unwrap_or(UNKNOWN_HTTP_STATUS.to_string()); + CPLANE_REQUESTS_FAILED + .with_label_values(&[CPlaneRequestRPC::GetSpec.as_str(), &status_str]) + .inc(); if retry { Err(anyhow!(msg)) } else { diff --git a/test_runner/regress/test_compute_migrations.py b/test_runner/regress/test_compute_migrations.py index 803702a6f8ac..710959606fac 100644 --- a/test_runner/regress/test_compute_migrations.py +++ b/test_runner/regress/test_compute_migrations.py @@ -5,11 +5,11 @@ import pytest from fixtures.compute_migrations import COMPUTE_MIGRATIONS, NUM_COMPUTE_MIGRATIONS +from fixtures.metrics import parse_metrics if TYPE_CHECKING: from fixtures.neon_fixtures import NeonEnv - def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_dir: Path): """ Test that compute_ctl can recover from migration failures next time it @@ -33,6 +33,17 @@ def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_d migration_id = cast("int", cur.fetchall()[0][0]) assert migration_id == i - 1 + # Check that migration failure is properly recorded in the metrics + client = endpoint.http_client() + raw_metrics = client.metrics() + metrics = parse_metrics(raw_metrics) + failed_migration = metrics.query_all( + "compute_ctl_db_migration_failed_total", + ) + assert len(failed_migration) == 1 + for sample in failed_migration: + assert sample.value == 1 + endpoint.stop() endpoint.start() diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index f18f4e78bd19..d7e6e9de5642 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -8,6 +8,7 @@ import pytest from fixtures.log_helper import log +from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import ( NeonEnvBuilder, ) @@ -128,6 +129,17 @@ def endpoint_handler_build_tag(request: Request) -> Response: httpserver.check() + # Check that we properly recorded downloads in the metrics + client = endpoint.http_client() + raw_metrics = client.metrics() + metrics = parse_metrics(raw_metrics) + remote_ext_requests = metrics.query_all( + "compute_ctl_remote_ext_requests_total", + ) + assert len(remote_ext_requests) == 1 + for sample in remote_ext_requests: + assert sample.value == 1 + # TODO # 1. Test downloading remote library. @@ -137,7 +149,7 @@ def endpoint_handler_build_tag(request: Request) -> Response: # # 3.Test that extension is downloaded after endpoint restart, # when the library is used in the query. -# Run the test with mutliple simultaneous connections to an endpoint. +# Run the test with multiple simultaneous connections to an endpoint. # to ensure that the extension is downloaded only once. # # 4. Test that private extensions are only downloaded when they are present in the spec.