Skip to content

Commit

Permalink
feat(compute): Add some basic compute_ctl metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ololobus committed Jan 27, 2025
1 parent c8fbbb9 commit de78bcd
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 64 deletions.
65 changes: 53 additions & 12 deletions compute_tools/src/extension_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ use tracing::info;
use tracing::log::warn;
use zstd::stream::read::Decoder;

use crate::metrics::{REMOTE_EXT_REQUESTS_FAILED, REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};

fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
// where argument is a flag like `--version` or `--sharedir`
Expand Down Expand Up @@ -258,21 +260,60 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res

info!("Download extension {:?} from uri {:?}", ext_path, uri);

let resp = reqwest::get(uri).await?;
REMOTE_EXT_REQUESTS_TOTAL.with_label_values(&[]).inc();

match do_extension_server_request(&uri).await {
Ok(resp) => {
info!(
"Successfully downloaded remote extension data {:?}",
ext_path
);
Ok(resp)
}
Err((msg, status)) => {
let status_str = status
.map(|s| s.to_string())
.unwrap_or(UNKNOWN_HTTP_STATUS.to_string());
REMOTE_EXT_REQUESTS_FAILED
.with_label_values(&[&status_str])
.inc();
bail!(msg);
}
}
}

// Do a single remote extensions server request.
// Return result or (error message + status code) in case of any failures.
async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, Option<StatusCode>)> {
let resp = reqwest::get(uri).await.map_err(|e| {
(
format!("could not perform remote extensions server request: {}", e),
None,
)
})?;
let status = resp.status();

match resp.status() {
match status {
StatusCode::OK => match resp.bytes().await {
Ok(resp) => {
info!("Download extension {:?} completed successfully", ext_path);
Ok(resp)
}
Err(e) => bail!("could not deserialize remote extension response: {}", e),
Ok(resp) => Ok(resp),
Err(e) => Err((
format!("could not read remote extensions server response: {}", e),
// It's fine to return and report error with status as 200 OK,
// because we still failed to read the response.
Some(status),
)),
},
StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
_ => bail!(
"unexpected remote extension response status code: {}",
resp.status()
),
StatusCode::SERVICE_UNAVAILABLE => Err((
"remote extensions server is temporarily unavailable".to_string(),
Some(status),
)),
_ => Err((
format!(
"unexpected remote extensions server response status code: {}",
status
),
Some(status),
)),
}
}

Expand Down
7 changes: 3 additions & 4 deletions compute_tools/src/http/routes/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ use axum::{body::Body, response::Response};
use http::header::CONTENT_TYPE;
use http::StatusCode;
use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use metrics::{Encoder, TextEncoder};

use crate::{http::JsonResponse, installed_extensions};
use crate::{http::JsonResponse, metrics::collect};

/// Expose Prometheus metrics.
pub(in crate::http) async fn get_metrics() -> Response {
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let metrics = installed_extensions::collect()
let metrics = collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();
Expand Down
18 changes: 1 addition & 17 deletions compute_tools/src/installed_extensions.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use metrics::proto::MetricFamily;
use std::collections::HashMap;

use anyhow::Result;
use postgres::{Client, NoTls};

use metrics::core::Collector;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;
use crate::metrics::INSTALLED_EXTENSIONS;

/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
Expand Down Expand Up @@ -102,16 +99,3 @@ pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<In
extensions: extensions_map.into_values().collect(),
})
}

static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});

pub fn collect() -> Vec<MetricFamily> {
INSTALLED_EXTENSIONS.collect()
}
1 change: 1 addition & 0 deletions compute_tools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod extension_server;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
pub mod metrics;
mod migration;
pub mod monitor;
pub mod params;
Expand Down
90 changes: 90 additions & 0 deletions compute_tools/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use metrics::core::Collector;
use metrics::proto::MetricFamily;
use metrics::{register_int_counter_vec, register_uint_gauge_vec, IntCounterVec, UIntGaugeVec};
use once_cell::sync::Lazy;

pub(crate) static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});

// Normally, any HTTP API request is described by METHOD (e.g. GET, POST, etc.) + PATH,
// but for all our APIs we defined a 'slug'/method/operationId in the OpenAPI spec.
// And it's fair to call it a 'RPC' (Remote Procedure Call).
pub enum CPlaneRequestRPC {
GetSpec,
}

impl CPlaneRequestRPC {
pub fn as_str(&self) -> &str {
match self {
CPlaneRequestRPC::GetSpec => "GetSpec",
}
}
}

pub const UNKNOWN_HTTP_STATUS: &str = "unknown";

pub(crate) static CPLANE_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_cplane_requests_total",
"Total number of control plane requests made by compute_ctl",
&["rpc"]
)
.expect("failed to define a metric")
});

pub(crate) static CPLANE_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_cplane_requests_failed_total",
"Total number of failed control plane requests made by compute_ctl",
&["rpc", "http_status"]
)
.expect("failed to define a metric")
});

/// Total number of failed database migrations. Per-compute, this is actually a boolean metric,
/// either empty or with a single value (1, migration_id) because we stop at the first failure.
/// Yet, the sum over the fleet will provide the total number of failures.
pub(crate) static DB_MIGRATION_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_db_migration_failed_total",
"Total number of failed database migrations",
&["migration_id"]
)
.expect("failed to define a metric")
});

pub(crate) static REMOTE_EXT_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_remote_ext_requests_total",
"Total number of requests made by compute_ctl to download extensions from S3 proxy",
// Do not use any labels like extension name yet.
// We can add them later if needed.
&[]
)
.expect("failed to define a metric")
});

pub(crate) static REMOTE_EXT_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_remote_ext_requests_failed_total",
"Total number of failed requests to S3 proxy",
&["http_status"]
)
.expect("failed to define a metric")
});

pub fn collect() -> Vec<MetricFamily> {
let mut metrics = INSTALLED_EXTENSIONS.collect();
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
metrics.extend(CPLANE_REQUESTS_FAILED.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(REMOTE_EXT_REQUESTS_FAILED.collect());
metrics
}
46 changes: 28 additions & 18 deletions compute_tools/src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::{Client, Transaction};
use tracing::info;
use tracing::{error, info};

use crate::metrics::DB_MIGRATION_FAILED;

/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
Expand Down Expand Up @@ -78,24 +80,31 @@ impl<'m> MigrationRunner<'m> {
Ok(())
}

/// Run an individual migration
fn run_migration(txn: &mut Transaction, migration_id: i64, migration: &str) -> Result<()> {
/// Run an individual migration in a separate transaction block.
fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
let mut txn = client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;

if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id);

// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
} else {
info!("Running migration id={}:\n{}\n", migration_id, migration);

txn.simple_query(migration)
.with_context(|| format!("apply migration {migration_id}"))?;

Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
}

txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;

Ok(())
}

Expand All @@ -109,19 +118,20 @@ impl<'m> MigrationRunner<'m> {
// The index lags the migration ID by 1, so the current migration
// ID is also the next index
let migration_id = (current_migration + 1) as i64;

let mut txn = self
.client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;

Self::run_migration(&mut txn, migration_id, self.migrations[current_migration])
.with_context(|| format!("running migration {migration_id}"))?;

txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;

info!("Finished migration id={}", migration_id);
let migration = self.migrations[current_migration];

match Self::run_migration(self.client, migration_id, migration) {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}
Err(e) => {
error!("Failed to run migration id={}: {}", migration_id, e);
DB_MIGRATION_FAILED
.with_label_values(&[migration_id.to_string().as_str()])
.inc();
return Err(e);
}
}

current_migration += 1;
}
Expand Down
Loading

0 comments on commit de78bcd

Please sign in to comment.