Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a mechanism for converting SQL schemas based on negotiated version #2417

Merged
merged 9 commits into from
Jan 16, 2025
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ restate-types = { workspace = true }
anyhow = { workspace = true }
arc-swap = { workspace = true }
arrow = { version = "53.1.0", features = ["ipc", "prettyprint", "json"] }
arrow_convert = { version = "0.7.2" }
arrow_convert = {git = "https://github.com/jackkleeman/arrow-convert", rev = "9878358dd38cf4a2a6c8367541be3cd374fbb2c6"}
axum = { workspace = true, default-features = false, features = ["http1", "http2", "query", "tokio"] }
bytes = { workspace = true }
base62 = { version = "2.0.2" }
Expand Down
20 changes: 18 additions & 2 deletions cli/src/clients/admin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::clients::AdminClientInterface;
use super::errors::ApiError;

/// Min/max supported admin API versions
pub const MIN_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V1;
pub const MAX_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V1;
pub const MIN_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V2;
pub const MAX_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V2;

#[derive(Error, Debug)]
#[error(transparent)]
Expand Down Expand Up @@ -164,6 +164,22 @@ impl AdminClient {
Ok(client)
}

pub fn versioned_url(&self, path: impl IntoIterator<Item = impl AsRef<str>>) -> Url {
let mut url = self.base_url.clone();

{
let mut segments = url.path_segments_mut().expect("Bad url!");
segments.pop_if_empty();

match self.admin_api_version.as_path_segment() {
Some(version) => segments.push(version).extend(path),
None => segments.extend(path),
};
}

url
}

fn choose_api_version(
mut client: AdminClient,
version_information: VersionInformation,
Expand Down
57 changes: 14 additions & 43 deletions cli/src/clients/admin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::fmt::Display;

use super::admin_client::Envelope;
use super::AdminClient;
Expand All @@ -28,7 +27,7 @@ pub trait AdminClientInterface {
modify_service_request: ModifyServiceRequest,
) -> reqwest::Result<Envelope<ServiceMetadata>>;
async fn get_deployments(&self) -> reqwest::Result<Envelope<ListDeploymentsResponse>>;
async fn get_deployment<D: Display>(
async fn get_deployment<D: AsRef<str>>(
&self,
id: D,
) -> reqwest::Result<Envelope<DetailedDeploymentResponse>>;
Expand All @@ -54,21 +53,17 @@ pub trait AdminClientInterface {

impl AdminClientInterface for AdminClient {
async fn health(&self) -> reqwest::Result<Envelope<()>> {
let url = self.base_url.join("/health").expect("Bad url!");
let url = self.versioned_url(["health"]);
self.run(reqwest::Method::GET, url).await
}

async fn get_services(&self) -> reqwest::Result<Envelope<ListServicesResponse>> {
let url = self.base_url.join("/services").expect("Bad url!");
let url = self.versioned_url(["services"]);
self.run(reqwest::Method::GET, url).await
}

async fn get_service(&self, name: &str) -> reqwest::Result<Envelope<ServiceMetadata>> {
let url = self
.base_url
.join(&format!("/services/{name}"))
.expect("Bad url!");

let url = self.versioned_url(["services", name]);
self.run(reqwest::Method::GET, url).await
}

Expand All @@ -77,37 +72,26 @@ impl AdminClientInterface for AdminClient {
name: &str,
modify_service_request: ModifyServiceRequest,
) -> reqwest::Result<Envelope<ServiceMetadata>> {
let url = self
.base_url
.join(&format!("/services/{name}"))
.expect("Bad url!");

let url = self.versioned_url(["services", name]);
self.run_with_body(reqwest::Method::PATCH, url, modify_service_request)
.await
}

async fn get_deployments(&self) -> reqwest::Result<Envelope<ListDeploymentsResponse>> {
let url = self.base_url.join("/deployments").expect("Bad url!");
let url = self.versioned_url(["deployments"]);
self.run(reqwest::Method::GET, url).await
}

async fn get_deployment<D: Display>(
async fn get_deployment<D: AsRef<str>>(
&self,
id: D,
) -> reqwest::Result<Envelope<DetailedDeploymentResponse>> {
let url = self
.base_url
.join(&format!("/deployments/{id}"))
.expect("Bad url!");
let url = self.versioned_url(["deployments", id.as_ref()]);
self.run(reqwest::Method::GET, url).await
}

async fn remove_deployment(&self, id: &str, force: bool) -> reqwest::Result<Envelope<()>> {
let mut url = self
.base_url
.join(&format!("/deployments/{id}"))
.expect("Bad url!");

let mut url = self.versioned_url(["deployments", id]);
url.set_query(Some(&format!("force={force}")));

self.run(reqwest::Method::DELETE, url).await
Expand All @@ -117,27 +101,19 @@ impl AdminClientInterface for AdminClient {
&self,
body: RegisterDeploymentRequest,
) -> reqwest::Result<Envelope<RegisterDeploymentResponse>> {
let url = self.base_url.join("/deployments").expect("Bad url!");
let url = self.versioned_url(["deployments"]);
self.run_with_body(reqwest::Method::POST, url, body).await
}

async fn purge_invocation(&self, id: &str) -> reqwest::Result<Envelope<()>> {
let mut url = self
.base_url
.join(&format!("/invocations/{id}"))
.expect("Bad url!");

let mut url = self.versioned_url(["invocations", id]);
url.set_query(Some("mode=purge"));

self.run(reqwest::Method::DELETE, url).await
}

async fn cancel_invocation(&self, id: &str, kill: bool) -> reqwest::Result<Envelope<()>> {
let mut url = self
.base_url
.join(&format!("/invocations/{id}"))
.expect("Bad url!");

let mut url = self.versioned_url(["invocations", id]);
url.set_query(Some(&format!(
"mode={}",
if kill { "kill" } else { "cancel" }
Expand All @@ -151,17 +127,12 @@ impl AdminClientInterface for AdminClient {
service: &str,
req: ModifyServiceStateRequest,
) -> reqwest::Result<Envelope<()>> {
let url = self
.base_url
.join(&format!("/services/{service}/state"))
.expect("Bad url!");

let url = self.versioned_url(["services", service, "state"]);
self.run_with_body(reqwest::Method::POST, url, req).await
}

async fn version(&self) -> reqwest::Result<Envelope<VersionInformation>> {
let url = self.base_url.join("/version").expect("Bad url!");

let url = self.versioned_url(["version"]);
self.run(reqwest::Method::GET, url).await
}
}
40 changes: 25 additions & 15 deletions cli/src/clients/datafusion_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::str::FromStr;

use anyhow::Result;
use arrow::array::{Array, ArrayAccessor, AsArray, StringArray};
use arrow::datatypes::{ArrowTemporalType, Date64Type};
use arrow::datatypes::ArrowTemporalType;
use arrow::record_batch::RecordBatch;
use arrow_convert::{ArrowDeserialize, ArrowField};
use bytes::Bytes;
Expand Down Expand Up @@ -111,10 +111,17 @@ fn value_as_u64_opt(batch: &RecordBatch, col: usize, row: usize) -> Option<u64>
}

fn value_as_dt_opt(batch: &RecordBatch, col: usize, row: usize) -> Option<chrono::DateTime<Local>> {
batch
.column(col)
.as_primitive::<arrow::datatypes::Date64Type>()
.value_as_local_datetime_opt(row)
let col = batch.column(col);
match col.data_type() {
arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, _) => col
.as_primitive::<arrow::datatypes::TimestampMillisecondType>()
.value_as_local_datetime_opt(row),
jackkleeman marked this conversation as resolved.
Show resolved Hide resolved
// older versions of restate used Date64 instead of TimestampMillisecond
arrow::datatypes::DataType::Date64 => col
.as_primitive::<arrow::datatypes::Date64Type>()
.value_as_local_datetime_opt(row),
_ => panic!("Column is not a timestamp"),
}
}

#[derive(ValueEnum, Copy, Clone, Eq, Hash, PartialEq, Debug, Default)]
Expand Down Expand Up @@ -515,11 +522,7 @@ pub async fn get_service_status(
.column(2)
.as_primitive::<arrow::datatypes::Int64Type>()
.value(i);
let oldest_at = batch
.column(3)
.as_primitive::<arrow::datatypes::Date64Type>()
.value_as_local_datetime_opt(i)
.unwrap();
let oldest_at = value_as_dt_opt(&batch, 3, i).unwrap();

let oldest_invocation = batch.column(4).as_string::<i32>().value_string(i);

Expand Down Expand Up @@ -744,23 +747,30 @@ impl From<RestateDateTime> for DateTime<Local> {
}
}

pub static TIMEZONE_UTC: std::sync::LazyLock<std::sync::Arc<str>> =
std::sync::LazyLock::new(|| std::sync::Arc::from("+00:00"));

impl arrow_convert::field::ArrowField for RestateDateTime {
type Type = Self;

#[inline]
fn data_type() -> arrow::datatypes::DataType {
arrow::datatypes::DataType::Date64
arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Millisecond,
Some(TIMEZONE_UTC.clone()),
)
}
}

impl arrow_convert::deserialize::ArrowDeserialize for RestateDateTime {
type ArrayType = arrow::array::Date64Array;
type ArrayType = arrow::array::TimestampMillisecondArray;

#[inline]
fn arrow_deserialize(v: Option<i64>) -> Option<Self> {
v.and_then(arrow::temporal_conversions::as_datetime::<Date64Type>)
.map(|naive| Local.from_utc_datetime(&naive))
.map(RestateDateTime)
let timestamp = arrow::temporal_conversions::as_datetime::<
arrow::datatypes::TimestampMillisecondType,
>(v?)?;
Some(RestateDateTime(Local.from_utc_datetime(&timestamp)))
}
}

Expand Down
2 changes: 1 addition & 1 deletion cli/src/clients/datafusion_http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl DataFusionHttpClient {
fn prepare(&self) -> Result<reqwest::RequestBuilder, Error> {
Ok(self
.inner
.prepare(reqwest::Method::POST, self.inner.base_url.join("/query")?))
.prepare(reqwest::Method::POST, self.inner.versioned_url(["query"])))
}

pub async fn run_query(&self, query: String) -> Result<SqlResponse, Error> {
Expand Down
2 changes: 1 addition & 1 deletion cli/src/commands/deployments/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub async fn run_register(State(env): State<CliEnv>, discover_opts: &Register) -

// Is this an existing deployment?
let existing_deployment = match client
.get_deployment(&dry_run_result.id)
.get_deployment(&dry_run_result.id.to_string())
.await?
.into_body()
.await
Expand Down
2 changes: 1 addition & 1 deletion cli/src/commands/services/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn describe(env: &CliEnv, opts: &Describe) -> Result<()> {
table.add_kv_row("Deployment ID:", service.deployment_id);

let deployment = client
.get_deployment(&service.deployment_id)
.get_deployment(&service.deployment_id.to_string())
.await?
.into_body()
.await?;
Expand Down
Loading
Loading