From 026b4cc003fb80b015ce74086b77c9bf636219c7 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 13 Dec 2024 10:36:33 +0100 Subject: [PATCH 1/4] move term to safekeeper_api --- libs/safekeeper_api/src/lib.rs | 6 ++++++ libs/safekeeper_api/src/models.rs | 15 +++++++++++++++ libs/safekeeper_api/src/state.rs | 1 + safekeeper/src/control_file_upgrade.rs | 3 ++- safekeeper/src/handler.rs | 2 +- safekeeper/src/http/routes.rs | 21 +++++---------------- safekeeper/src/json_ctrl.rs | 3 ++- safekeeper/src/pull_timeline.rs | 2 +- safekeeper/src/recovery.rs | 5 +++-- safekeeper/src/safekeeper.rs | 6 ++---- safekeeper/src/send_wal.rs | 3 ++- safekeeper/src/state.rs | 5 ++--- safekeeper/src/timeline.rs | 5 ++--- safekeeper/src/timeline_manager.rs | 2 +- safekeeper/src/wal_backup_partial.rs | 2 +- safekeeper/src/wal_reader_stream.rs | 2 +- 16 files changed, 47 insertions(+), 36 deletions(-) create mode 100644 libs/safekeeper_api/src/state.rs diff --git a/libs/safekeeper_api/src/lib.rs b/libs/safekeeper_api/src/lib.rs index 63c2c51188b8..fcaf7d31ba2c 100644 --- a/libs/safekeeper_api/src/lib.rs +++ b/libs/safekeeper_api/src/lib.rs @@ -5,6 +5,12 @@ use const_format::formatcp; /// Public API types pub mod models; +pub mod state; + +/// Consensus logical timestamp. Note: it is a part of sk control file. +pub type Term = u64; +pub const INVALID_TERM: Term = 0; + pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454; pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 28666d197afd..f5c861d1eb40 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -5,6 +5,13 @@ use utils::{ lsn::Lsn, }; +use crate::Term; + +#[derive(Debug, Serialize)] +pub struct SafekeeperStatus { + pub id: NodeId, +} + #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { pub tenant_id: TenantId, @@ -18,6 +25,14 @@ pub struct TimelineCreateRequest { pub local_start_lsn: Option, } +/// Same as TermLsn, but serializes LSN using display serializer +/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct TermSwitchApiEntry { + pub term: Term, + pub lsn: Lsn, +} + fn lsn_invalid() -> Lsn { Lsn::INVALID } diff --git a/libs/safekeeper_api/src/state.rs b/libs/safekeeper_api/src/state.rs new file mode 100644 index 000000000000..8b137891791f --- /dev/null +++ b/libs/safekeeper_api/src/state.rs @@ -0,0 +1 @@ + diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index a4b4670e423b..47fbcf7e4772 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -1,11 +1,12 @@ //! Code to deal with safekeeper control file upgrades use crate::{ - safekeeper::{AcceptorState, PgUuid, ServerInfo, Term, TermHistory, TermLsn}, + safekeeper::{AcceptorState, PgUuid, ServerInfo, TermHistory, TermLsn}, state::{EvictionState, PersistedPeers, TimelinePersistentState}, wal_backup_partial, }; use anyhow::{bail, Result}; use pq_proto::SystemId; +use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use tracing::*; use utils::{ diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 2ca6333ba835..563162bb2afd 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -4,6 +4,7 @@ use anyhow::Context; use pageserver_api::models::ShardParameters; use pageserver_api::shard::{ShardIdentity, ShardStripeSize}; +use safekeeper_api::Term; use std::future::Future; use std::str::{self, FromStr}; use std::sync::Arc; @@ -16,7 +17,6 @@ use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; use crate::metrics::{TrafficMetrics, PG_QUERIES_GAUGE}; -use crate::safekeeper::Term; use crate::timeline::TimelineError; use crate::wal_service::ConnectionId; use crate::{GlobalTimelines, SafeKeeperConf}; diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 71c36f1d4631..082854b8c206 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,4 +1,7 @@ use hyper::{Body, Request, Response, StatusCode}; +use safekeeper_api::models::SafekeeperStatus; +use safekeeper_api::models::TermSwitchApiEntry; +use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; @@ -31,13 +34,12 @@ use utils::{ request::{ensure_no_body, parse_request_param}, RequestExt, RouterBuilder, }, - id::{NodeId, TenantId, TenantTimelineId, TimelineId}, + id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, }; use crate::debug_dump::TimelineDigestRequest; use crate::receive_wal::WalReceiverState; -use crate::safekeeper::Term; use crate::safekeeper::{ServerInfo, TermLsn}; use crate::send_wal::WalSenderState; use crate::timeline::PeerInfo; @@ -46,11 +48,6 @@ use crate::GlobalTimelines; use crate::SafeKeeperConf; use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline}; -#[derive(Debug, Serialize)] -struct SafekeeperStatus { - id: NodeId, -} - /// Healthcheck handler. async fn status_handler(request: Request) -> Result, ApiError> { check_permission(&request, None)?; @@ -73,14 +70,6 @@ fn get_global_timelines(request: &Request) -> Arc { .clone() } -/// Same as TermLsn, but serializes LSN using display serializer -/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct TermSwitchApiEntry { - pub term: Term, - pub lsn: Lsn, -} - impl From for TermLsn { fn from(api_val: TermSwitchApiEntry) -> Self { TermLsn { @@ -94,7 +83,7 @@ impl From for TermLsn { #[derive(Debug, Serialize, Deserialize)] pub struct AcceptorStateStatus { pub term: Term, - pub epoch: Term, // aka last_log_term + pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility pub term_history: Vec, } diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index dc4ad3706e6c..dfa7b02ed66c 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -8,6 +8,7 @@ use anyhow::Context; use postgres_backend::QueryError; +use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::*; @@ -17,7 +18,7 @@ use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo}; use crate::safekeeper::{ AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, }; -use crate::safekeeper::{Term, TermHistory, TermLsn}; +use crate::safekeeper::{TermHistory, TermLsn}; use crate::state::TimelinePersistentState; use crate::timeline::WalResidentTimeline; use postgres_backend::PostgresBackend; diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index f58a9dca1dbc..41356b2989af 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -4,6 +4,7 @@ use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt, TryStreamExt}; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; +use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use std::{ cmp::min, @@ -25,7 +26,6 @@ use crate::{ client::{self, Client}, routes::TimelineStatus, }, - safekeeper::Term, state::{EvictionState, TimelinePersistentState}, timeline::{Timeline, WalResidentTimeline}, timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline}, diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 7b87166aa052..d424bddbd32a 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -7,6 +7,7 @@ use std::{fmt, pin::pin}; use anyhow::{bail, Context}; use futures::StreamExt; use postgres_protocol::message::backend::ReplicationMessage; +use safekeeper_api::Term; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::timeout; use tokio::{ @@ -27,8 +28,8 @@ use crate::{ http::routes::TimelineStatus, receive_wal::MSG_QUEUE_SIZE, safekeeper::{ - AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory, - TermLsn, VoteRequest, + AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, TermHistory, TermLsn, + VoteRequest, }, timeline::PeerInfo, SafeKeeperConf, diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 6eb69f0b7ce2..4f0e254b7ac6 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -5,6 +5,8 @@ use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::{TimeLineID, MAX_SEND_SIZE}; +use safekeeper_api::Term; +use safekeeper_api::INVALID_TERM; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::cmp::min; @@ -31,10 +33,6 @@ use utils::{ const SK_PROTOCOL_VERSION: u32 = 2; pub const UNKNOWN_SERVER_VERSION: u32 = 0; -/// Consensus logical timestamp. -pub type Term = u64; -pub const INVALID_TERM: Term = 0; - #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct TermLsn { pub term: Term, diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 0887cf726418..cfccd048a3da 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -4,7 +4,7 @@ use crate::handler::SafekeeperPostgresHandler; use crate::metrics::RECEIVED_PS_FEEDBACKS; use crate::receive_wal::WalReceivers; -use crate::safekeeper::{Term, TermLsn}; +use crate::safekeeper::TermLsn; use crate::send_interpreted_wal::InterpretedWalSender; use crate::timeline::WalResidentTimeline; use crate::wal_reader_stream::WalReaderStreamBuilder; @@ -19,6 +19,7 @@ use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError}; use postgres_ffi::get_current_timestamp; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; +use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use utils::failpoint_support; diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index 941b7e67d0a9..ad3dc1b0137b 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -5,7 +5,7 @@ use std::{cmp::max, ops::Deref}; use anyhow::{bail, Result}; use postgres_ffi::WAL_SEGMENT_SIZE; -use safekeeper_api::models::TimelineTermBumpResponse; +use safekeeper_api::{models::TimelineTermBumpResponse, Term}; use serde::{Deserialize, Serialize}; use utils::{ id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -15,8 +15,7 @@ use utils::{ use crate::{ control_file, safekeeper::{ - AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory, - UNKNOWN_SERVER_VERSION, + AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory, UNKNOWN_SERVER_VERSION, }, timeline::TimelineError, wal_backup_partial::{self}, diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 94d6ef106160..e8fde8e198a5 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -5,6 +5,7 @@ use anyhow::{anyhow, bail, Result}; use camino::{Utf8Path, Utf8PathBuf}; use remote_storage::RemotePath; use safekeeper_api::models::TimelineTermBumpResponse; +use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use tokio::fs::{self}; use tokio_util::sync::CancellationToken; @@ -31,9 +32,7 @@ use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use crate::control_file; use crate::rate_limit::RateLimiter; use crate::receive_wal::WalReceivers; -use crate::safekeeper::{ - AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, Term, TermLsn, -}; +use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn}; use crate::send_wal::WalSenders; use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState}; use crate::timeline_guard::ResidenceGuard; diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index c02fb904cf63..583b18ea2913 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -14,6 +14,7 @@ use std::{ use futures::channel::oneshot; use postgres_ffi::XLogSegNo; +use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use tokio::{ task::{JoinError, JoinHandle}, @@ -32,7 +33,6 @@ use crate::{ rate_limit::{rand_duration, RateLimiter}, recovery::recovery_main, remove_wal::calc_horizon_lsn, - safekeeper::Term, send_wal::WalSenders, state::TimelineState, timeline::{ManagerTimeline, PeerInfo, ReadGuardSharedState, StateSK, WalResidentTimeline}, diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index bddfca50e4fb..4e5b34a9bf65 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -22,6 +22,7 @@ use camino::Utf8PathBuf; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use remote_storage::RemotePath; +use safekeeper_api::Term; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; @@ -31,7 +32,6 @@ use utils::{id::NodeId, lsn::Lsn}; use crate::{ metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS}, rate_limit::{rand_duration, RateLimiter}, - safekeeper::Term, timeline::WalResidentTimeline, timeline_manager::StateSnapshot, wal_backup::{self}, diff --git a/safekeeper/src/wal_reader_stream.rs b/safekeeper/src/wal_reader_stream.rs index f8c0c502cdbc..aea628c20808 100644 --- a/safekeeper/src/wal_reader_stream.rs +++ b/safekeeper/src/wal_reader_stream.rs @@ -4,12 +4,12 @@ use async_stream::try_stream; use bytes::Bytes; use futures::Stream; use postgres_backend::CopyStreamHandlerEnd; +use safekeeper_api::Term; use std::time::Duration; use tokio::time::timeout; use utils::lsn::Lsn; use crate::{ - safekeeper::Term, send_wal::{EndWatch, WalSenderGuard}, timeline::WalResidentTimeline, }; From 3abc28ab0ce8eeb02b8bd9eb5ad6b424fe451b0c Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 13 Dec 2024 12:47:17 +0100 Subject: [PATCH 2/4] Move the rest of types. --- Cargo.lock | 3 + libs/safekeeper_api/Cargo.toml | 5 +- libs/safekeeper_api/src/lib.rs | 15 ++- libs/safekeeper_api/src/models.rs | 157 ++++++++++++++++++++++++- libs/safekeeper_api/src/send_wal.rs | 2 + libs/safekeeper_api/src/state.rs | 4 +- safekeeper/src/control_file_upgrade.rs | 4 +- safekeeper/src/debug_dump.rs | 2 +- safekeeper/src/handler.rs | 2 +- safekeeper/src/http/client.rs | 3 +- safekeeper/src/http/routes.rs | 55 ++------- safekeeper/src/json_ctrl.rs | 4 +- safekeeper/src/pull_timeline.rs | 7 +- safekeeper/src/receive_wal.rs | 21 +--- safekeeper/src/recovery.rs | 3 +- safekeeper/src/safekeeper.rs | 13 +- safekeeper/src/send_wal.rs | 86 +------------- safekeeper/src/state.rs | 6 +- safekeeper/src/timeline.rs | 50 ++------ safekeeper/src/timeline_manager.rs | 4 +- safekeeper/src/timelines_global_map.rs | 2 +- safekeeper/src/wal_backup.rs | 3 +- safekeeper/src/wal_service.rs | 3 +- 23 files changed, 234 insertions(+), 220 deletions(-) create mode 100644 libs/safekeeper_api/src/send_wal.rs diff --git a/Cargo.lock b/Cargo.lock index e2d5e03613b1..c4f80f63c9be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5565,7 +5565,10 @@ name = "safekeeper_api" version = "0.1.0" dependencies = [ "const_format", + "postgres_ffi", + "pq_proto", "serde", + "tokio", "utils", ] diff --git a/libs/safekeeper_api/Cargo.toml b/libs/safekeeper_api/Cargo.toml index 14811232d33b..4234ec6779a2 100644 --- a/libs/safekeeper_api/Cargo.toml +++ b/libs/safekeeper_api/Cargo.toml @@ -5,6 +5,9 @@ edition.workspace = true license.workspace = true [dependencies] -serde.workspace = true const_format.workspace = true +serde.workspace = true +postgres_ffi.workspace = true +pq_proto.workspace = true +tokio.workspace = true utils.workspace = true diff --git a/libs/safekeeper_api/src/lib.rs b/libs/safekeeper_api/src/lib.rs index fcaf7d31ba2c..365242cca572 100644 --- a/libs/safekeeper_api/src/lib.rs +++ b/libs/safekeeper_api/src/lib.rs @@ -1,16 +1,29 @@ #![deny(unsafe_code)] #![deny(clippy::undocumented_unsafe_blocks)] use const_format::formatcp; +use pq_proto::SystemId; +use serde::{Deserialize, Serialize}; /// Public API types pub mod models; -pub mod state; +pub mod send_wal; /// Consensus logical timestamp. Note: it is a part of sk control file. pub type Term = u64; pub const INVALID_TERM: Term = 0; +/// Information about Postgres. Safekeeper gets it once and then verifies all +/// further connections from computes match. Note: it is a part of sk control +/// file. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ServerInfo { + /// Postgres server version + pub pg_version: u32, + pub system_id: SystemId, + pub wal_seg_size: u32, +} + pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454; pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index f5c861d1eb40..3e424a792c7f 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -1,11 +1,17 @@ +//! Types used in safekeeper http API. Many of them are also reused internally. + +use postgres_ffi::TimestampTz; use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; +use tokio::time::Instant; use utils::{ - id::{NodeId, TenantId, TimelineId}, + id::{NodeId, TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, + pageserver_feedback::PageserverFeedback, }; -use crate::Term; +use crate::{ServerInfo, Term}; #[derive(Debug, Serialize)] pub struct SafekeeperStatus { @@ -33,6 +39,153 @@ pub struct TermSwitchApiEntry { pub lsn: Lsn, } +/// Augment AcceptorState with last_log_term for convenience +#[derive(Debug, Serialize, Deserialize)] +pub struct AcceptorStateStatus { + pub term: Term, + pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility + pub term_history: Vec, +} + +/// Things safekeeper should know about timeline state on peers. +/// Used as both model and internally. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerInfo { + pub sk_id: NodeId, + pub term: Term, + /// Term of the last entry. + pub last_log_term: Term, + /// LSN of the last record. + pub flush_lsn: Lsn, + pub commit_lsn: Lsn, + /// Since which LSN safekeeper has WAL. + pub local_start_lsn: Lsn, + /// When info was received. Serde annotations are not very useful but make + /// the code compile -- we don't rely on this field externally. + #[serde(skip)] + #[serde(default = "Instant::now")] + pub ts: Instant, + pub pg_connstr: String, + pub http_connstr: String, +} + +pub type FullTransactionId = u64; + +/// Hot standby feedback received from replica +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct HotStandbyFeedback { + pub ts: TimestampTz, + pub xmin: FullTransactionId, + pub catalog_xmin: FullTransactionId, +} + +pub const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0; + +impl HotStandbyFeedback { + pub fn empty() -> HotStandbyFeedback { + HotStandbyFeedback { + ts: 0, + xmin: 0, + catalog_xmin: 0, + } + } +} + +/// Standby status update +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct StandbyReply { + pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby. + pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby. + pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby. + pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. + pub reply_requested: bool, +} + +impl StandbyReply { + pub fn empty() -> Self { + StandbyReply { + write_lsn: Lsn::INVALID, + flush_lsn: Lsn::INVALID, + apply_lsn: Lsn::INVALID, + reply_ts: 0, + reply_requested: false, + } + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct StandbyFeedback { + pub reply: StandbyReply, + pub hs_feedback: HotStandbyFeedback, +} + +impl StandbyFeedback { + pub fn empty() -> Self { + StandbyFeedback { + reply: StandbyReply::empty(), + hs_feedback: HotStandbyFeedback::empty(), + } + } +} + +/// Receiver is either pageserver or regular standby, which have different +/// feedbacks. +/// Used as both model and internally. +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum ReplicationFeedback { + Pageserver(PageserverFeedback), + Standby(StandbyFeedback), +} + +/// Uniquely identifies a WAL service connection. Logged in spans for +/// observability. +pub type ConnectionId = u32; + +/// Serialize is used only for json'ing in API response. Also used internally. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WalSenderState { + pub ttid: TenantTimelineId, + pub addr: SocketAddr, + pub conn_id: ConnectionId, + // postgres application_name + pub appname: Option, + pub feedback: ReplicationFeedback, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WalReceiverState { + /// None means it is recovery initiated by us (this safekeeper). + pub conn_id: Option, + pub status: WalReceiverStatus, +} + +/// Walreceiver status. Currently only whether it passed voting stage and +/// started receiving the stream, but it is easy to add more if needed. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum WalReceiverStatus { + Voting, + Streaming, +} + +/// Info about timeline on safekeeper ready for reporting. +#[derive(Debug, Serialize, Deserialize)] +pub struct TimelineStatus { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub acceptor_state: AcceptorStateStatus, + pub pg_info: ServerInfo, + pub flush_lsn: Lsn, + pub timeline_start_lsn: Lsn, + pub local_start_lsn: Lsn, + pub commit_lsn: Lsn, + pub backup_lsn: Lsn, + pub peer_horizon_lsn: Lsn, + pub remote_consistent_lsn: Lsn, + pub peers: Vec, + pub walsenders: Vec, + pub walreceivers: Vec, +} + fn lsn_invalid() -> Lsn { Lsn::INVALID } diff --git a/libs/safekeeper_api/src/send_wal.rs b/libs/safekeeper_api/src/send_wal.rs new file mode 100644 index 000000000000..e29cd6e0292a --- /dev/null +++ b/libs/safekeeper_api/src/send_wal.rs @@ -0,0 +1,2 @@ +//! This module contains structs used in safekeeper send_wal.rs which are also +//! exposed in http API. diff --git a/libs/safekeeper_api/src/state.rs b/libs/safekeeper_api/src/state.rs index 8b137891791f..340499c3a17f 100644 --- a/libs/safekeeper_api/src/state.rs +++ b/libs/safekeeper_api/src/state.rs @@ -1 +1,3 @@ - +//! This module contains structs which are exposed in the http API but at the +//! same time part of the safekeeper control file. Any change of in them must be +//! accompanied with control file upgrade. diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index 47fbcf7e4772..dd152fd4cce8 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -1,12 +1,12 @@ //! Code to deal with safekeeper control file upgrades use crate::{ - safekeeper::{AcceptorState, PgUuid, ServerInfo, TermHistory, TermLsn}, + safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn}, state::{EvictionState, PersistedPeers, TimelinePersistentState}, wal_backup_partial, }; use anyhow::{bail, Result}; use pq_proto::SystemId; -use safekeeper_api::Term; +use safekeeper_api::{ServerInfo, Term}; use serde::{Deserialize, Serialize}; use tracing::*; use utils::{ diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index 93011eddec07..19362a0992d4 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -14,6 +14,7 @@ use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use postgres_ffi::XLogSegNo; use postgres_ffi::MAX_SEND_SIZE; +use safekeeper_api::models::WalSenderState; use serde::Deserialize; use serde::Serialize; @@ -25,7 +26,6 @@ use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use crate::safekeeper::TermHistory; -use crate::send_wal::WalSenderState; use crate::state::TimelineMemState; use crate::state::TimelinePersistentState; use crate::timeline::get_timeline_dir; diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 563162bb2afd..bb639bfb3221 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -4,6 +4,7 @@ use anyhow::Context; use pageserver_api::models::ShardParameters; use pageserver_api::shard::{ShardIdentity, ShardStripeSize}; +use safekeeper_api::models::ConnectionId; use safekeeper_api::Term; use std::future::Future; use std::str::{self, FromStr}; @@ -18,7 +19,6 @@ use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; use crate::metrics::{TrafficMetrics, PG_QUERIES_GAUGE}; use crate::timeline::TimelineError; -use crate::wal_service::ConnectionId; use crate::{GlobalTimelines, SafeKeeperConf}; use postgres_backend::PostgresBackend; use postgres_backend::QueryError; diff --git a/safekeeper/src/http/client.rs b/safekeeper/src/http/client.rs index a166fc1ab9b0..669a9c0ce94b 100644 --- a/safekeeper/src/http/client.rs +++ b/safekeeper/src/http/client.rs @@ -8,6 +8,7 @@ //! etc. use reqwest::{IntoUrl, Method, StatusCode}; +use safekeeper_api::models::TimelineStatus; use std::error::Error as _; use utils::{ http::error::HttpErrorBody, @@ -15,8 +16,6 @@ use utils::{ logging::SecretString, }; -use super::routes::TimelineStatus; - #[derive(Debug, Clone)] pub struct Client { mgmt_api_endpoint: String, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 082854b8c206..9bc1bf340919 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,8 +1,9 @@ use hyper::{Body, Request, Response, StatusCode}; +use safekeeper_api::models::AcceptorStateStatus; use safekeeper_api::models::SafekeeperStatus; use safekeeper_api::models::TermSwitchApiEntry; -use safekeeper_api::Term; -use serde::{Deserialize, Serialize}; +use safekeeper_api::models::TimelineStatus; +use safekeeper_api::ServerInfo; use std::collections::HashMap; use std::fmt; use std::io::Write as _; @@ -39,10 +40,7 @@ use utils::{ }; use crate::debug_dump::TimelineDigestRequest; -use crate::receive_wal::WalReceiverState; -use crate::safekeeper::{ServerInfo, TermLsn}; -use crate::send_wal::WalSenderState; -use crate::timeline::PeerInfo; +use crate::safekeeper::TermLsn; use crate::timelines_global_map::TimelineDeleteForceResult; use crate::GlobalTimelines; use crate::SafeKeeperConf; @@ -70,42 +68,6 @@ fn get_global_timelines(request: &Request) -> Arc { .clone() } -impl From for TermLsn { - fn from(api_val: TermSwitchApiEntry) -> Self { - TermLsn { - term: api_val.term, - lsn: api_val.lsn, - } - } -} - -/// Augment AcceptorState with last_log_term for convenience -#[derive(Debug, Serialize, Deserialize)] -pub struct AcceptorStateStatus { - pub term: Term, - pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility - pub term_history: Vec, -} - -/// Info about timeline on safekeeper ready for reporting. -#[derive(Debug, Serialize, Deserialize)] -pub struct TimelineStatus { - pub tenant_id: TenantId, - pub timeline_id: TimelineId, - pub acceptor_state: AcceptorStateStatus, - pub pg_info: ServerInfo, - pub flush_lsn: Lsn, - pub timeline_start_lsn: Lsn, - pub local_start_lsn: Lsn, - pub commit_lsn: Lsn, - pub backup_lsn: Lsn, - pub peer_horizon_lsn: Lsn, - pub remote_consistent_lsn: Lsn, - pub peers: Vec, - pub walsenders: Vec, - pub walreceivers: Vec, -} - fn check_permission(request: &Request, tenant_id: Option) -> Result<(), ApiError> { check_permission_with(request, |claims| { crate::auth::check_permission(claims, tenant_id) @@ -176,6 +138,15 @@ async fn timeline_list_handler(request: Request) -> Result, json_response(StatusCode::OK, res) } +impl From for TermLsn { + fn from(api_val: TermSwitchApiEntry) -> Self { + TermLsn { + term: api_val.term, + lsn: api_val.lsn, + } + } +} + /// Report info about timeline. async fn timeline_status_handler(request: Request) -> Result, ApiError> { let ttid = TenantTimelineId::new( diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index dfa7b02ed66c..256e350ceba5 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -8,13 +8,13 @@ use anyhow::Context; use postgres_backend::QueryError; -use safekeeper_api::Term; +use safekeeper_api::{ServerInfo, Term}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::*; use crate::handler::SafekeeperPostgresHandler; -use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo}; +use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; use crate::safekeeper::{ AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, }; diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 41356b2989af..00777273cbf9 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -4,7 +4,7 @@ use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt, TryStreamExt}; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; -use safekeeper_api::Term; +use safekeeper_api::{models::TimelineStatus, Term}; use serde::{Deserialize, Serialize}; use std::{ cmp::min, @@ -22,10 +22,7 @@ use tracing::{error, info, instrument}; use crate::{ control_file::CONTROL_FILE_NAME, debug_dump, - http::{ - client::{self, Client}, - routes::TimelineStatus, - }, + http::client::{self, Client}, state::{EvictionState, TimelinePersistentState}, timeline::{Timeline, WalResidentTimeline}, timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline}, diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 2a49890d618f..08371177cd24 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -9,9 +9,7 @@ use crate::metrics::{ }; use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; -use crate::safekeeper::ServerInfo; use crate::timeline::WalResidentTimeline; -use crate::wal_service::ConnectionId; use crate::GlobalTimelines; use anyhow::{anyhow, Context}; use bytes::BytesMut; @@ -23,8 +21,8 @@ use postgres_backend::PostgresBackend; use postgres_backend::PostgresBackendReader; use postgres_backend::QueryError; use pq_proto::BeMessage; -use serde::Deserialize; -use serde::Serialize; +use safekeeper_api::models::{ConnectionId, WalReceiverState, WalReceiverStatus}; +use safekeeper_api::ServerInfo; use std::future; use std::net::SocketAddr; use std::sync::Arc; @@ -171,21 +169,6 @@ impl WalReceiversShared { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WalReceiverState { - /// None means it is recovery initiated by us (this safekeeper). - pub conn_id: Option, - pub status: WalReceiverStatus, -} - -/// Walreceiver status. Currently only whether it passed voting stage and -/// started receiving the stream, but it is easy to add more if needed. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum WalReceiverStatus { - Voting, - Streaming, -} - /// Scope guard to access slot in WalReceivers registry and unregister from /// it in Drop. pub struct WalReceiverGuard { diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index d424bddbd32a..61647c16b00a 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -7,6 +7,7 @@ use std::{fmt, pin::pin}; use anyhow::{bail, Context}; use futures::StreamExt; use postgres_protocol::message::backend::ReplicationMessage; +use safekeeper_api::models::{PeerInfo, TimelineStatus}; use safekeeper_api::Term; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::timeout; @@ -25,13 +26,11 @@ use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE}; use crate::safekeeper::{AppendRequest, AppendRequestHeader}; use crate::timeline::WalResidentTimeline; use crate::{ - http::routes::TimelineStatus, receive_wal::MSG_QUEUE_SIZE, safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, TermHistory, TermLsn, VoteRequest, }, - timeline::PeerInfo, SafeKeeperConf, }; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 4f0e254b7ac6..ccd7940c7212 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -5,6 +5,7 @@ use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::{TimeLineID, MAX_SEND_SIZE}; +use safekeeper_api::models::HotStandbyFeedback; use safekeeper_api::Term; use safekeeper_api::INVALID_TERM; use serde::{Deserialize, Serialize}; @@ -18,7 +19,6 @@ use tracing::*; use crate::control_file; use crate::metrics::MISC_OPERATION_SECONDS; -use crate::send_wal::HotStandbyFeedback; use crate::state::TimelineState; use crate::wal_storage; @@ -196,16 +196,6 @@ impl AcceptorState { } } -/// Information about Postgres. Safekeeper gets it once and then verifies -/// all further connections from computes match. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ServerInfo { - /// Postgres server version - pub pg_version: u32, - pub system_id: SystemId, - pub wal_seg_size: u32, -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistedPeerInfo { /// LSN up to which safekeeper offloaded WAL to s3. @@ -1039,6 +1029,7 @@ where mod tests { use futures::future::BoxFuture; use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE}; + use safekeeper_api::ServerInfo; use super::*; use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState}; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index cfccd048a3da..84632219984a 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -8,7 +8,6 @@ use crate::safekeeper::TermLsn; use crate::send_interpreted_wal::InterpretedWalSender; use crate::timeline::WalResidentTimeline; use crate::wal_reader_stream::WalReaderStreamBuilder; -use crate::wal_service::ConnectionId; use crate::wal_storage::WalReader; use anyhow::{bail, Context as AnyhowContext}; use bytes::Bytes; @@ -19,8 +18,11 @@ use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError}; use postgres_ffi::get_current_timestamp; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; +use safekeeper_api::models::{ + ConnectionId, HotStandbyFeedback, ReplicationFeedback, StandbyFeedback, StandbyReply, + WalSenderState, INVALID_FULL_TRANSACTION_ID, +}; use safekeeper_api::Term; -use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use utils::failpoint_support; use utils::id::TenantTimelineId; @@ -29,7 +31,6 @@ use utils::postgres_client::PostgresClientProtocol; use std::cmp::{max, min}; use std::net::SocketAddr; -use std::str; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch::Receiver; @@ -43,65 +44,6 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; // neon extension of replication protocol const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; -type FullTransactionId = u64; - -/// Hot standby feedback received from replica -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct HotStandbyFeedback { - pub ts: TimestampTz, - pub xmin: FullTransactionId, - pub catalog_xmin: FullTransactionId, -} - -const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0; - -impl HotStandbyFeedback { - pub fn empty() -> HotStandbyFeedback { - HotStandbyFeedback { - ts: 0, - xmin: 0, - catalog_xmin: 0, - } - } -} - -/// Standby status update -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct StandbyReply { - pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby. - pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby. - pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby. - pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01. - pub reply_requested: bool, -} - -impl StandbyReply { - fn empty() -> Self { - StandbyReply { - write_lsn: Lsn::INVALID, - flush_lsn: Lsn::INVALID, - apply_lsn: Lsn::INVALID, - reply_ts: 0, - reply_requested: false, - } - } -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct StandbyFeedback { - pub reply: StandbyReply, - pub hs_feedback: HotStandbyFeedback, -} - -impl StandbyFeedback { - pub fn empty() -> Self { - StandbyFeedback { - reply: StandbyReply::empty(), - hs_feedback: HotStandbyFeedback::empty(), - } - } -} - /// WalSenders registry. Timeline holds it (wrapped in Arc). pub struct WalSenders { mutex: Mutex, @@ -342,25 +284,6 @@ impl WalSendersShared { } } -// Serialized is used only for pretty printing in json. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WalSenderState { - ttid: TenantTimelineId, - addr: SocketAddr, - conn_id: ConnectionId, - // postgres application_name - appname: Option, - feedback: ReplicationFeedback, -} - -// Receiver is either pageserver or regular standby, which have different -// feedbacks. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -enum ReplicationFeedback { - Pageserver(PageserverFeedback), - Standby(StandbyFeedback), -} - // id of the occupied slot in WalSenders to access it (and save in the // WalSenderGuard). We could give Arc directly to the slot, but there is not // much sense in that as values aggregation which is performed on each feedback @@ -889,6 +812,7 @@ impl ReplyReader { #[cfg(test)] mod tests { + use safekeeper_api::models::FullTransactionId; use utils::id::{TenantId, TimelineId}; use super::*; diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index ad3dc1b0137b..c6ae6c1d2b0e 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -5,7 +5,7 @@ use std::{cmp::max, ops::Deref}; use anyhow::{bail, Result}; use postgres_ffi::WAL_SEGMENT_SIZE; -use safekeeper_api::{models::TimelineTermBumpResponse, Term}; +use safekeeper_api::{models::TimelineTermBumpResponse, ServerInfo, Term}; use serde::{Deserialize, Serialize}; use utils::{ id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -14,9 +14,7 @@ use utils::{ use crate::{ control_file, - safekeeper::{ - AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory, UNKNOWN_SERVER_VERSION, - }, + safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, TermHistory, UNKNOWN_SERVER_VERSION}, timeline::TimelineError, wal_backup_partial::{self}, }; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index e8fde8e198a5..36860a0da2b4 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,9 +4,8 @@ use anyhow::{anyhow, bail, Result}; use camino::{Utf8Path, Utf8PathBuf}; use remote_storage::RemotePath; -use safekeeper_api::models::TimelineTermBumpResponse; +use safekeeper_api::models::{PeerInfo, TimelineTermBumpResponse}; use safekeeper_api::Term; -use serde::{Deserialize, Serialize}; use tokio::fs::{self}; use tokio_util::sync::CancellationToken; use utils::id::TenantId; @@ -46,40 +45,17 @@ use crate::wal_storage::{Storage as wal_storage_iface, WalReader}; use crate::SafeKeeperConf; use crate::{debug_dump, timeline_manager, wal_storage}; -/// Things safekeeper should know about timeline state on peers. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PeerInfo { - pub sk_id: NodeId, - pub term: Term, - /// Term of the last entry. - pub last_log_term: Term, - /// LSN of the last record. - pub flush_lsn: Lsn, - pub commit_lsn: Lsn, - /// Since which LSN safekeeper has WAL. - pub local_start_lsn: Lsn, - /// When info was received. Serde annotations are not very useful but make - /// the code compile -- we don't rely on this field externally. - #[serde(skip)] - #[serde(default = "Instant::now")] - ts: Instant, - pub pg_connstr: String, - pub http_connstr: String, -} - -impl PeerInfo { - fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo { - PeerInfo { - sk_id: NodeId(sk_info.safekeeper_id), - term: sk_info.term, - last_log_term: sk_info.last_log_term, - flush_lsn: Lsn(sk_info.flush_lsn), - commit_lsn: Lsn(sk_info.commit_lsn), - local_start_lsn: Lsn(sk_info.local_start_lsn), - pg_connstr: sk_info.safekeeper_connstr.clone(), - http_connstr: sk_info.http_connstr.clone(), - ts, - } +fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo { + PeerInfo { + sk_id: NodeId(sk_info.safekeeper_id), + term: sk_info.term, + last_log_term: sk_info.last_log_term, + flush_lsn: Lsn(sk_info.flush_lsn), + commit_lsn: Lsn(sk_info.commit_lsn), + local_start_lsn: Lsn(sk_info.local_start_lsn), + pg_connstr: sk_info.safekeeper_connstr.clone(), + http_connstr: sk_info.http_connstr.clone(), + ts, } } @@ -696,7 +672,7 @@ impl Timeline { { let mut shared_state = self.write_shared_state().await; shared_state.sk.record_safekeeper_info(&sk_info).await?; - let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now()); + let peer_info = peer_info_from_sk_info(&sk_info, Instant::now()); shared_state.peers_info.upsert(&peer_info); } Ok(()) diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 583b18ea2913..a33994dcabaa 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -14,7 +14,7 @@ use std::{ use futures::channel::oneshot; use postgres_ffi::XLogSegNo; -use safekeeper_api::Term; +use safekeeper_api::{models::PeerInfo, Term}; use serde::{Deserialize, Serialize}; use tokio::{ task::{JoinError, JoinHandle}, @@ -35,7 +35,7 @@ use crate::{ remove_wal::calc_horizon_lsn, send_wal::WalSenders, state::TimelineState, - timeline::{ManagerTimeline, PeerInfo, ReadGuardSharedState, StateSK, WalResidentTimeline}, + timeline::{ManagerTimeline, ReadGuardSharedState, StateSK, WalResidentTimeline}, timeline_guard::{AccessService, GuardId, ResidenceGuard}, timelines_set::{TimelineSetGuard, TimelinesSet}, wal_backup::{self, WalBackupTaskHandle}, diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index e1241ceb9b84..ad29c9f66c2c 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -4,7 +4,6 @@ use crate::defaults::DEFAULT_EVICTION_CONCURRENCY; use crate::rate_limit::RateLimiter; -use crate::safekeeper::ServerInfo; use crate::state::TimelinePersistentState; use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError}; use crate::timelines_set::TimelinesSet; @@ -13,6 +12,7 @@ use crate::{control_file, wal_storage, SafeKeeperConf}; use anyhow::{bail, Context, Result}; use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; +use safekeeper_api::ServerInfo; use serde::Serialize; use std::collections::HashMap; use std::str::FromStr; diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 34b5dbeaa1cf..8517fa03443c 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -3,6 +3,7 @@ use anyhow::{Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; use futures::stream::FuturesOrdered; use futures::StreamExt; +use safekeeper_api::models::PeerInfo; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use utils::backoff; @@ -30,7 +31,7 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS}; -use crate::timeline::{PeerInfo, WalResidentTimeline}; +use crate::timeline::WalResidentTimeline; use crate::timeline_manager::{Manager, StateSnapshot}; use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME}; diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 1ff83918a76c..1ebcb060e776 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -4,6 +4,7 @@ //! use anyhow::{Context, Result}; use postgres_backend::QueryError; +use safekeeper_api::models::ConnectionId; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpStream; @@ -114,8 +115,6 @@ async fn handle_socket( .await } -/// Unique WAL service connection ids are logged in spans for observability. -pub type ConnectionId = u32; pub type ConnectionCount = u32; pub fn issue_connection_id(count: &mut ConnectionCount) -> ConnectionId { From 83db0cd17621884de1c087848252d9a669bc5a3f Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 13 Dec 2024 13:02:19 +0100 Subject: [PATCH 3/4] rm empty files --- libs/safekeeper_api/src/lib.rs | 2 -- libs/safekeeper_api/src/send_wal.rs | 2 -- libs/safekeeper_api/src/state.rs | 3 --- 3 files changed, 7 deletions(-) delete mode 100644 libs/safekeeper_api/src/send_wal.rs delete mode 100644 libs/safekeeper_api/src/state.rs diff --git a/libs/safekeeper_api/src/lib.rs b/libs/safekeeper_api/src/lib.rs index 365242cca572..be6923aca902 100644 --- a/libs/safekeeper_api/src/lib.rs +++ b/libs/safekeeper_api/src/lib.rs @@ -7,8 +7,6 @@ use serde::{Deserialize, Serialize}; /// Public API types pub mod models; -pub mod send_wal; - /// Consensus logical timestamp. Note: it is a part of sk control file. pub type Term = u64; pub const INVALID_TERM: Term = 0; diff --git a/libs/safekeeper_api/src/send_wal.rs b/libs/safekeeper_api/src/send_wal.rs deleted file mode 100644 index e29cd6e0292a..000000000000 --- a/libs/safekeeper_api/src/send_wal.rs +++ /dev/null @@ -1,2 +0,0 @@ -//! This module contains structs used in safekeeper send_wal.rs which are also -//! exposed in http API. diff --git a/libs/safekeeper_api/src/state.rs b/libs/safekeeper_api/src/state.rs deleted file mode 100644 index 340499c3a17f..000000000000 --- a/libs/safekeeper_api/src/state.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! This module contains structs which are exposed in the http API but at the -//! same time part of the safekeeper control file. Any change of in them must be -//! accompanied with control file upgrade. From 7ee5d88d7ed2ced00698f122e4558a3d79350dbf Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 13 Dec 2024 13:59:33 +0100 Subject: [PATCH 4/4] fix simulator --- safekeeper/tests/walproposer_sim/safekeeper.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 12aa02577185..efcdd89e7da7 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -15,12 +15,13 @@ use desim::{ }; use http::Uri; use safekeeper::{ - safekeeper::{ProposerAcceptorMessage, SafeKeeper, ServerInfo, UNKNOWN_SERVER_VERSION}, + safekeeper::{ProposerAcceptorMessage, SafeKeeper, UNKNOWN_SERVER_VERSION}, state::{TimelinePersistentState, TimelineState}, timeline::TimelineError, wal_storage::Storage, SafeKeeperConf, }; +use safekeeper_api::ServerInfo; use tracing::{debug, info_span, warn}; use utils::{ id::{NodeId, TenantId, TenantTimelineId, TimelineId},