Skip to content

Commit

Permalink
feat(compute): Add some basic compute_ctl metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ololobus committed Jan 24, 2025
1 parent 9f1408f commit c3df6c5
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 50 deletions.
7 changes: 3 additions & 4 deletions compute_tools/src/http/routes/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ use axum::{body::Body, response::Response};
use http::header::CONTENT_TYPE;
use http::StatusCode;
use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use metrics::{Encoder, TextEncoder};

use crate::{http::JsonResponse, installed_extensions};
use crate::{http::JsonResponse, metrics::collect};

/// Expose Prometheus metrics.
pub(in crate::http) async fn get_metrics() -> Response {
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let metrics = installed_extensions::collect()
let metrics = collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();
Expand Down
18 changes: 1 addition & 17 deletions compute_tools/src/installed_extensions.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use metrics::proto::MetricFamily;
use std::collections::HashMap;

use anyhow::Result;
use postgres::{Client, NoTls};

use metrics::core::Collector;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;
use crate::metrics::INSTALLED_EXTENSIONS;

/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
Expand Down Expand Up @@ -102,16 +99,3 @@ pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<In
extensions: extensions_map.into_values().collect(),
})
}

static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});

pub fn collect() -> Vec<MetricFamily> {
INSTALLED_EXTENSIONS.collect()
}
1 change: 1 addition & 0 deletions compute_tools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod extension_server;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
pub mod metrics;
mod migration;
pub mod monitor;
pub mod params;
Expand Down
63 changes: 63 additions & 0 deletions compute_tools/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use metrics::core::Collector;
use metrics::proto::MetricFamily;
use metrics::{register_int_counter_vec, register_uint_gauge_vec, IntCounterVec, UIntGaugeVec};
use once_cell::sync::Lazy;

pub(crate) static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});

pub enum CPlaneRequestMethod {
GetSpec,
}

impl CPlaneRequestMethod {
pub fn as_str(&self) -> &str {
match self {
CPlaneRequestMethod::GetSpec => "GetSpec",
}
}
}

pub(crate) static CPLANE_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_cplane_requests_total",
"Total number of control plane requests made by the compute_ctl",
&["method"]
)
.expect("failed to define a metric")
});

pub(crate) static CPLANE_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_cplane_requests_failed",
"Total number of failed control plane requests made by the compute_ctl",
&["method", "http_status"]
)
.expect("failed to define a metric")
});

/// Total number of failed database migrations. Per-compute, this is actually a boolean metric,
/// either empty or with a single value (1, migration_id) because we stop at the first failure.
/// Yet, the sum over the fleet will provide the total number of failures.
pub(crate) static DB_MIGRATION_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_db_migration_failed",
"Total number of failed database migrations",
&["migration_id"]
)
.expect("failed to define a metric")
});

pub fn collect() -> Vec<MetricFamily> {
let mut metrics = INSTALLED_EXTENSIONS.collect();
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
metrics.extend(CPLANE_REQUESTS_FAILED.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());
metrics
}
46 changes: 28 additions & 18 deletions compute_tools/src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::{Client, Transaction};
use tracing::info;
use tracing::{error, info};

use crate::metrics::DB_MIGRATION_FAILED;

/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
Expand Down Expand Up @@ -78,24 +80,31 @@ impl<'m> MigrationRunner<'m> {
Ok(())
}

/// Run an individual migration
fn run_migration(txn: &mut Transaction, migration_id: i64, migration: &str) -> Result<()> {
/// Run an individual migration in a separate transaction block.
fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
let mut txn = client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;

if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id);

// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
} else {
info!("Running migration id={}:\n{}\n", migration_id, migration);

txn.simple_query(migration)
.with_context(|| format!("apply migration {migration_id}"))?;

Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
}

txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;

Ok(())
}

Expand All @@ -109,19 +118,20 @@ impl<'m> MigrationRunner<'m> {
// The index lags the migration ID by 1, so the current migration
// ID is also the next index
let migration_id = (current_migration + 1) as i64;

let mut txn = self
.client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;

Self::run_migration(&mut txn, migration_id, self.migrations[current_migration])
.with_context(|| format!("running migration {migration_id}"))?;

txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;

info!("Finished migration id={}", migration_id);
let migration = self.migrations[current_migration];

match Self::run_migration(self.client, migration_id, migration) {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}
Err(e) => {
error!("Failed to run migration id={}: {}", migration_id, e);
DB_MIGRATION_FAILED
.with_label_values(&[migration_id.to_string().as_str()])
.inc();
return Err(e);
}
}

current_migration += 1;
}
Expand Down
39 changes: 28 additions & 11 deletions compute_tools/src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::Path;
use tracing::{error, info, instrument, warn};

use crate::config;
use crate::metrics::{CPlaneRequestMethod::GetSpec, CPLANE_REQUESTS_FAILED, CPLANE_REQUESTS_TOTAL};
use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
Expand All @@ -19,7 +20,7 @@ use compute_api::spec::ComputeSpec;
fn do_control_plane_request(
uri: &str,
jwt: &str,
) -> Result<ControlPlaneSpecResponse, (bool, String)> {
) -> Result<ControlPlaneSpecResponse, (bool, String, Option<StatusCode>)> {
let resp = reqwest::blocking::Client::new()
.get(uri)
.header("Authorization", format!("Bearer {}", jwt))
Expand All @@ -28,34 +29,41 @@ fn do_control_plane_request(
(
true,
format!("could not perform spec request to control plane: {}", e),
None,
)
})?;

match resp.status() {
let status = resp.status();
match status {
StatusCode::OK => match resp.json::<ControlPlaneSpecResponse>() {
Ok(spec_resp) => Ok(spec_resp),
Err(e) => Err((
true,
format!("could not deserialize control plane response: {}", e),
Some(status),
)),
},
StatusCode::SERVICE_UNAVAILABLE => {
Err((true, "control plane is temporarily unavailable".to_string()))
}
StatusCode::SERVICE_UNAVAILABLE => Err((
true,
"control plane is temporarily unavailable".to_string(),
Some(status),
)),
StatusCode::BAD_GATEWAY => {
// We have a problem with intermittent 502 errors now
// https://github.com/neondatabase/cloud/issues/2353
// It's fine to retry GET request in this case.
Err((true, "control plane request failed with 502".to_string()))
Err((
true,
"control plane request failed with 502".to_string(),
Some(status),
))
}
// Another code, likely 500 or 404, means that compute is unknown to the control plane
// or some internal failure happened. Doesn't make much sense to retry in this case.
_ => Err((
false,
format!(
"unexpected control plane response status code: {}",
resp.status()
),
format!("unexpected control plane response status code: {}", status),
Some(status),
)),
}
}
Expand All @@ -82,6 +90,9 @@ pub fn get_spec_from_control_plane(
// - no spec for compute yet (Empty state) -> return Ok(None)
// - got spec -> return Ok(Some(spec))
while attempt < 4 {
CPLANE_REQUESTS_TOTAL
.with_label_values(&[GetSpec.as_str()])
.inc();
spec = match do_control_plane_request(&cp_uri, &jwt) {
Ok(spec_resp) => match spec_resp.status {
ControlPlaneComputeStatus::Empty => Ok(None),
Expand All @@ -93,7 +104,13 @@ pub fn get_spec_from_control_plane(
}
}
},
Err((retry, msg)) => {
Err((retry, msg, status)) => {
let status_str = status
.map(|s| s.to_string())
.unwrap_or("unknown".to_string());
CPLANE_REQUESTS_FAILED
.with_label_values(&[GetSpec.as_str(), &status_str])
.inc();
if retry {
Err(anyhow!(msg))
} else {
Expand Down

0 comments on commit c3df6c5

Please sign in to comment.