Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compute): Add some basic compute_ctl metrics #10504

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 53 additions & 12 deletions compute_tools/src/extension_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ use tracing::info;
use tracing::log::warn;
use zstd::stream::read::Decoder;

use crate::metrics::{REMOTE_EXT_REQUESTS_FAILED, REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};

fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
// where argument is a flag like `--version` or `--sharedir`
Expand Down Expand Up @@ -258,21 +260,60 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res

info!("Download extension {:?} from uri {:?}", ext_path, uri);

let resp = reqwest::get(uri).await?;
REMOTE_EXT_REQUESTS_TOTAL.with_label_values(&[]).inc();

match do_extension_server_request(&uri).await {
Ok(resp) => {
info!(
"Successfully downloaded remote extension data {:?}",
ext_path
);
Ok(resp)
}
Err((msg, status)) => {
let status_str = status
.map(|s| s.to_string())
.unwrap_or(UNKNOWN_HTTP_STATUS.to_string());
REMOTE_EXT_REQUESTS_FAILED
.with_label_values(&[&status_str])
.inc();
bail!(msg);
}
}
}

// Do a single remote extensions server request.
// Return result or (error message + status code) in case of any failures.
async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, Option<StatusCode>)> {
let resp = reqwest::get(uri).await.map_err(|e| {
(
format!("could not perform remote extensions server request: {}", e),
None,
)
})?;
let status = resp.status();

match resp.status() {
match status {
StatusCode::OK => match resp.bytes().await {
Ok(resp) => {
info!("Download extension {:?} completed successfully", ext_path);
Ok(resp)
}
Err(e) => bail!("could not deserialize remote extension response: {}", e),
Ok(resp) => Ok(resp),
Err(e) => Err((
format!("could not read remote extensions server response: {}", e),
// It's fine to return and report error with status as 200 OK,
// because we still failed to read the response.
Some(status),
)),
},
StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
_ => bail!(
"unexpected remote extension response status code: {}",
resp.status()
),
StatusCode::SERVICE_UNAVAILABLE => Err((
"remote extensions server is temporarily unavailable".to_string(),
Some(status),
)),
_ => Err((
format!(
"unexpected remote extensions server response status code: {}",
status
),
Some(status),
)),
}
}

Expand Down
7 changes: 3 additions & 4 deletions compute_tools/src/http/routes/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ use axum::{body::Body, response::Response};
use http::header::CONTENT_TYPE;
use http::StatusCode;
use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use metrics::{Encoder, TextEncoder};

use crate::{http::JsonResponse, installed_extensions};
use crate::{http::JsonResponse, metrics::collect};

/// Expose Prometheus metrics.
pub(in crate::http) async fn get_metrics() -> Response {
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let metrics = installed_extensions::collect()
let metrics = collect()
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();
Expand Down
18 changes: 1 addition & 17 deletions compute_tools/src/installed_extensions.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use metrics::proto::MetricFamily;
use std::collections::HashMap;

use anyhow::Result;
use postgres::{Client, NoTls};

use metrics::core::Collector;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;
use crate::metrics::INSTALLED_EXTENSIONS;

/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
Expand Down Expand Up @@ -102,16 +99,3 @@ pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<In
extensions: extensions_map.into_values().collect(),
})
}

static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});

pub fn collect() -> Vec<MetricFamily> {
INSTALLED_EXTENSIONS.collect()
}
1 change: 1 addition & 0 deletions compute_tools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod extension_server;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;
pub mod metrics;
mod migration;
pub mod monitor;
pub mod params;
Expand Down
90 changes: 90 additions & 0 deletions compute_tools/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use metrics::core::Collector;
use metrics::proto::MetricFamily;
use metrics::{register_int_counter_vec, register_uint_gauge_vec, IntCounterVec, UIntGaugeVec};
use once_cell::sync::Lazy;

pub(crate) static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});

// Normally, any HTTP API request is described by METHOD (e.g. GET, POST, etc.) + PATH,
// but for all our APIs we defined a 'slug'/method/operationId in the OpenAPI spec.
// And it's fair to call it a 'RPC' (Remote Procedure Call).
pub enum CPlaneRequestRPC {
GetSpec,
}

impl CPlaneRequestRPC {
pub fn as_str(&self) -> &str {
match self {
CPlaneRequestRPC::GetSpec => "GetSpec",
}
}
}

pub const UNKNOWN_HTTP_STATUS: &str = "unknown";

pub(crate) static CPLANE_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
tristan957 marked this conversation as resolved.
Show resolved Hide resolved
register_int_counter_vec!(
"compute_ctl_cplane_requests_total",
"Total number of control plane requests made by compute_ctl",
&["rpc"]
)
.expect("failed to define a metric")
});

pub(crate) static CPLANE_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_cplane_requests_failed_total",
"Total number of failed control plane requests made by compute_ctl",
&["rpc", "http_status"]
)
.expect("failed to define a metric")
});

/// Total number of failed database migrations. Per-compute, this is actually a boolean metric,
/// either empty or with a single value (1, migration_id) because we stop at the first failure.
/// Yet, the sum over the fleet will provide the total number of failures.
pub(crate) static DB_MIGRATION_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_db_migration_failed_total",
"Total number of failed database migrations",
&["migration_id"]
)
.expect("failed to define a metric")
});

pub(crate) static REMOTE_EXT_REQUESTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_remote_ext_requests_total",
"Total number of requests made by compute_ctl to download extensions from S3 proxy",
// Do not use any labels like extension name yet.
// We can add them later if needed.
&[]
)
.expect("failed to define a metric")
});

pub(crate) static REMOTE_EXT_REQUESTS_FAILED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"compute_ctl_remote_ext_requests_failed_total",
"Total number of failed requests to S3 proxy",
&["http_status"]
)
.expect("failed to define a metric")
});

pub fn collect() -> Vec<MetricFamily> {
let mut metrics = INSTALLED_EXTENSIONS.collect();
metrics.extend(CPLANE_REQUESTS_TOTAL.collect());
metrics.extend(CPLANE_REQUESTS_FAILED.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(REMOTE_EXT_REQUESTS_FAILED.collect());
metrics
}
46 changes: 28 additions & 18 deletions compute_tools/src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::{Client, Transaction};
use tracing::info;
use tracing::{error, info};

use crate::metrics::DB_MIGRATION_FAILED;

/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
Expand Down Expand Up @@ -78,24 +80,31 @@ impl<'m> MigrationRunner<'m> {
Ok(())
}

/// Run an individual migration
fn run_migration(txn: &mut Transaction, migration_id: i64, migration: &str) -> Result<()> {
/// Run an individual migration in a separate transaction block.
fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
let mut txn = client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;

if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id);

// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
} else {
info!("Running migration id={}:\n{}\n", migration_id, migration);

txn.simple_query(migration)
.with_context(|| format!("apply migration {migration_id}"))?;

Self::update_migration_id(txn, migration_id)?;
Self::update_migration_id(&mut txn, migration_id)?;
}

txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;

Ok(())
}

Expand All @@ -109,19 +118,20 @@ impl<'m> MigrationRunner<'m> {
// The index lags the migration ID by 1, so the current migration
// ID is also the next index
let migration_id = (current_migration + 1) as i64;

let mut txn = self
.client
.transaction()
.with_context(|| format!("begin transaction for migration {migration_id}"))?;

Self::run_migration(&mut txn, migration_id, self.migrations[current_migration])
.with_context(|| format!("running migration {migration_id}"))?;

txn.commit()
.with_context(|| format!("commit transaction for migration {migration_id}"))?;

info!("Finished migration id={}", migration_id);
let migration = self.migrations[current_migration];

match Self::run_migration(self.client, migration_id, migration) {
Ok(_) => {
info!("Finished migration id={}", migration_id);
}
Err(e) => {
error!("Failed to run migration id={}: {}", migration_id, e);
DB_MIGRATION_FAILED
.with_label_values(&[migration_id.to_string().as_str()])
.inc();
return Err(e);
}
}

current_migration += 1;
}
Expand Down
Loading
Loading