Skip to content

Commit

Permalink
fix: on_error_callback route (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Jan 23, 2025
1 parent 02c179c commit 17dbc42
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 112 deletions.
70 changes: 22 additions & 48 deletions integrationos-api/src/logic/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ async fn test_connection(
.await;
}

if let Some(delay) = connection_config.test_delay_in_millis {
if delay < 0 {
tracing::warn!("test_delay_in_millis is negative, skipping delay");
} else {
let delay = u16::try_from(delay).map_err(|e| {
error!("Error converting test_delay_in_millis to u64: {:?}", e);

InternalError::serialize_error(
"Unable to convert test_delay_in_millis to u64",
None,
)
})?;

tokio::time::sleep(Duration::from_millis(delay.into())).await
}
}

let res = state
.extractor_caller
.execute_model_definition(
Expand All @@ -138,30 +155,8 @@ async fn test_connection(
}

impl PublicExt<Connection> for CreateConnectionPayload {
fn public(input: Connection) -> Value {
SanitizedConnection {
id: input.id,
platform_version: input.platform_version,
connection_definition_id: input.connection_definition_id,
r#type: input.r#type,
key: input.key,
group: input.group,
name: input.name,
environment: input.environment,
platform: input.platform,
secrets_service_id: input.secrets_service_id,
event_access_id: input.event_access_id,
identity: input.identity,
identity_type: input.identity_type,
settings: input.settings,
throughput: input.throughput,
ownership: input.ownership,
error: input.error,
has_error: input.has_error,
oauth: input.oauth,
record_metadata: input.record_metadata,
}
.to_value()
fn public(conn: Connection) -> Value {
Into::<SanitizedConnection>::into(conn).to_value()
}
}

Expand Down Expand Up @@ -303,7 +298,7 @@ pub async fn create_connection(
error!("Error creating secret for connection: {:?}", e);
})?;

let connection = Connection {
let conn = Connection {
id: connection_id,
platform_version: connection_config.clone().platform_version,
connection_definition_id: payload.connection_definition_id,
Expand Down Expand Up @@ -333,34 +328,13 @@ pub async fn create_connection(
state
.app_stores
.connection
.create_one(&connection)
.create_one(&conn)
.await
.inspect_err(|e| {
error!("Error creating connection: {:?}", e);
})?;

Ok(Json(SanitizedConnection {
id: connection.id,
platform_version: connection.platform_version,
connection_definition_id: connection.connection_definition_id,
r#type: connection.r#type,
key: connection.key,
group: connection.group,
name: connection.name,
environment: connection.environment,
platform: connection.platform,
secrets_service_id: connection.secrets_service_id,
event_access_id: connection.event_access_id,
identity: connection.identity,
identity_type: connection.identity_type,
settings: connection.settings,
throughput: connection.throughput,
ownership: connection.ownership,
has_error: connection.has_error,
error: connection.error,
oauth: connection.oauth,
record_metadata: connection.record_metadata,
}))
Ok(Json(conn.into()))
}

async fn generate_k8s_specs_and_secret(
Expand Down
2 changes: 2 additions & 0 deletions integrationos-api/src/logic/connection_definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub struct CreateRequest {
pub settings: Settings,
pub paths: Paths,
pub test_connection: Option<Id>,
pub test_delay_in_millis: Option<i16>,
pub active: bool,
#[serde(default)]
pub markdown: Option<String>,
Expand Down Expand Up @@ -358,6 +359,7 @@ impl RequestExt for CreateRequest {
paths: self.paths.clone(),
settings: self.settings.clone(),
hidden: false,
test_delay_in_millis: self.test_delay_in_millis,
record_metadata: RecordMetadata::default(),
};

Expand Down
1 change: 1 addition & 0 deletions integrationos-api/tests/standard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ fn test_json_connection_definition() {
},
hidden: true,
test_connection: Some(Id::test(IdPrefix::Connection)),
test_delay_in_millis: None,
record_metadata: RecordMetadata::test(),
};

Expand Down
2 changes: 1 addition & 1 deletion integrationos-database/src/algebra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub async fn on_error_callback(
None,
))?;

let path = format!("{base_path}/v1/database-connection-lost/{connection_id}");
let path = format!("{base_path}/v1/event-callbacks/database-connection-lost/{connection_id}");

let authorization = Claims::from_secret(jwt_secret.as_str())?;
let payload = ConnectionLostReason {
Expand Down
11 changes: 10 additions & 1 deletion integrationos-database/src/algebra/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,16 @@ impl Storage for PostgresDatabaseConnection {
}

async fn probe(&self) -> Result<bool, IntegrationOSError> {
self.execute_raw("SELECT 1").await.map(|_| true)
let result = self.execute_raw("SELECT 1").await.map(|_| true);

if result == Ok(true) {
result
} else {
Err(ApplicationError::bad_request(
"Failed to probe database",
None,
))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion integrationos-database/tests/http/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn test_kill_signal() -> Result<Unit, IntegrationOSError> {
.create_async()
.await;

let path = format!("/v1/database-connection-lost/{connection_id}");
let path = format!("/v1/event-callbacks/database-connection-lost/{connection_id}");
let body = ConnectionLostReason {
reason: "Deserialization error: Failed to deserialize secret: error decoding response body"
.to_string(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{api_model_config::AuthMethod, ConnectionType};
use crate::id::{prefix::IdPrefix, Id};
use crate::id::Id;
use crate::prelude::shared::{record_metadata::RecordMetadata, settings::Settings};
use serde::{Deserialize, Serialize};
use strum::{self, AsRefStr, Display};
Expand Down Expand Up @@ -27,6 +27,8 @@ pub struct ConnectionDefinition {
pub settings: Settings,
pub hidden: bool,
pub test_connection: Option<Id>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub test_delay_in_millis: Option<i16>,
#[serde(flatten, default)]
pub record_metadata: RecordMetadata,
}
Expand Down Expand Up @@ -83,66 +85,6 @@ pub struct ModelSorting {
}

impl ConnectionDefinition {
pub fn new(
name: String,
description: String,
platform: String,
platform_version: String,
category: String,
image: String,
tags: Vec<String>,
) -> Self {
let key = format!("api::{}::{}", platform, platform_version);

Self {
id: Id::now(IdPrefix::ConnectionDefinition),
platform_version,
platform: platform.clone(),
r#type: ConnectionDefinitionType::Api,
status: ConnectionStatus::Beta,
name: name.clone(),
key,
frontend: Frontend {
spec: Spec {
title: name.clone(),
description: description.clone(),
platform,
category,
image,
tags,
helper_link: None,
markdown: None,
},
connection_form: ConnectionForm {
name,
description,
form_data: vec![],
},
},
test_connection: None,
auth_secrets: vec![],
auth_method: None,
multi_env: false,
paths: Paths {
id: None,
event: None,
payload: None,
timestamp: None,
secret: None,
signature: None,
cursor: None,
},
settings: Settings {
parse_webhook_body: false,
show_secret: false,
allow_custom_events: false,
oauth: false,
},
hidden: true,
record_metadata: RecordMetadata::default(),
}
}

pub fn to_connection_type(&self) -> super::ConnectionType {
match self.r#type {
ConnectionDefinitionType::Api => ConnectionType::Api {},
Expand Down
27 changes: 27 additions & 0 deletions integrationos-domain/src/domain/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,33 @@ pub struct SanitizedConnection {
pub record_metadata: RecordMetadata,
}

impl From<Connection> for SanitizedConnection {
fn from(conn: Connection) -> Self {
Self {
id: conn.id,
platform_version: conn.platform_version,
connection_definition_id: conn.connection_definition_id,
r#type: conn.r#type,
key: conn.key,
group: conn.group,
name: conn.name,
environment: conn.environment,
platform: conn.platform,
secrets_service_id: conn.secrets_service_id,
event_access_id: conn.event_access_id,
identity: conn.identity,
identity_type: conn.identity_type,
settings: conn.settings,
throughput: conn.throughput,
ownership: conn.ownership,
oauth: conn.oauth,
has_error: conn.has_error,
error: conn.error,
record_metadata: conn.record_metadata,
}
}
}

impl SanitizedConnection {
pub fn to_value(&self) -> Value {
serde_json::to_value(self).unwrap_or(Value::Null)
Expand Down

0 comments on commit 17dbc42

Please sign in to comment.