Skip to content

Commit

Permalink
feat: implementing ID lookup for passthrough (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Jan 29, 2025
1 parent 47a2a60 commit b243652
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 45 deletions.
13 changes: 8 additions & 5 deletions api/src/logic/passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ use axum::{
};
use entities::{
constant::PICA_PASSTHROUGH_HEADER,
ApplicationError, InternalError,
{
destination::{Action, Destination},
event_access::EventAccess,
},
destination::{Action, Destination},
event_access::EventAccess,
ApplicationError, InternalError, QUERY_BY_ID_PASSTHROUGH,
};
use http::{header::CONTENT_LENGTH, HeaderMap, HeaderName, Method, Uri};
use hyper::body::Bytes;
Expand Down Expand Up @@ -53,13 +51,18 @@ pub async fn passthrough_request(
)
.await?;

let id = headers
.get(QUERY_BY_ID_PASSTHROUGH)
.and_then(|h| h.to_str().ok());

tracing::info!("Executing {} request on {}", method, uri.path());

let destination = Destination {
platform: connection.platform.clone(),
action: Action::Passthrough {
path: uri.path().into(),
method,
id: id.map(|i| i.into()),
},
connection_key: connection.key.clone(),
};
Expand Down
1 change: 1 addition & 0 deletions entities/src/domain/constant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub const ENVIRONMENT_FILTER: &str = "environment";
pub const DUAL_ENVIRONMENT_HEADER: &str = "x-pica-show-all-environments";
pub const LIMIT_FILTER: &str = "limit";
pub const SKIP_FILTER: &str = "skip";
pub const QUERY_BY_ID_PASSTHROUGH: &str = "x-pica-action-id";

// JWT constants
pub const BEARER_PREFIX: &str = "Bearer ";
Expand Down
2 changes: 2 additions & 0 deletions entities/src/domain/pipeline/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum Action {
method: http::Method,
#[cfg_attr(feature = "dummy", dummy(expr = "String::new().into()"))]
path: Arc<str>,
#[cfg_attr(feature = "dummy", dummy(default))]
id: Option<Arc<str>>,
},
Unified {
#[cfg_attr(feature = "dummy", dummy(expr = "String::new().into()"))]
Expand Down
97 changes: 57 additions & 40 deletions unified/src/unified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,41 +128,60 @@ impl UnifiedDestination {
destination: &Destination,
) -> Result<Option<ConnectionModelDefinition>, PicaError> {
match &destination.action {
Action::Passthrough { method, path } => {
let connection_model_definitions = self
.connection_model_definitions_store
.get_many(
Some(doc! {
"connectionPlatform": destination.platform.as_ref(),
"action": method.as_str(),
"supported": true
}),
None,
None,
None,
None,
)
.await?;
Action::Passthrough { method, path, id } => match id {
Some(id) => {
let connection_model_definitions = self
.connection_model_definitions_store
.get_many(
Some(doc! {
"_id": id.to_string(),
}),
None,
None,
None,
None,
)
.await?;

let routes = connection_model_definitions
.iter()
.map(|c| match c.platform_info {
PlatformInfo::Api(ref c) => c.path.as_ref(),
});

let matched_route = match_route(path, routes.clone()).map(|r| r.to_string());

let mut connection_model_definitions = connection_model_definitions
.clone()
.into_iter()
.filter(|c| match c.platform_info {
PlatformInfo::Api(ref c) => matched_route
.as_ref()
.is_some_and(|mr| c.path.as_str() == mr),
});

if let Some(connection_model_definition) = connection_model_definitions.next() {
if connection_model_definitions.next().is_some() {
Ok(connection_model_definitions.first().cloned())
}
None => {
let connection_model_definitions = self
.connection_model_definitions_store
.get_many(
Some(doc! {
"connectionPlatform": destination.platform.as_ref(),
"action": method.as_str(),
"supported": true
}),
None,
None,
None,
None,
)
.await?;

let routes =
connection_model_definitions
.iter()
.map(|c| match c.platform_info {
PlatformInfo::Api(ref c) => c.path.as_ref(),
});

let matched_route = match_route(path, routes.clone()).map(|r| r.to_string());

let connection_model_definitions: Vec<ConnectionModelDefinition> =
connection_model_definitions
.clone()
.into_iter()
.filter(|c| match c.platform_info {
PlatformInfo::Api(ref c) => matched_route
.as_ref()
.is_some_and(|mr| c.path.as_str() == mr),
})
.collect();

if connection_model_definitions.len() > 1 {
error!("Multiple connection model definitions found for this path. Destination: {:?}, Routes: {:?}", destination, routes);

return Err(InternalError::invalid_argument(
Expand All @@ -171,11 +190,9 @@ impl UnifiedDestination {
));
}

Ok(Some(connection_model_definition))
} else {
Ok(None)
Ok(connection_model_definitions.first().cloned())
}
}
},
Action::Unified { name, action, .. } => Ok(self
.connection_model_definitions_store
.collection
Expand Down Expand Up @@ -503,7 +520,7 @@ impl UnifiedDestination {

build_unified_response(config, metadata, is_passthrough)(body, pagination, passthrough, params, status, headers)
}
Action::Passthrough { method, path } => Err(InternalError::invalid_argument(
Action::Passthrough { method, path, .. } => Err(InternalError::invalid_argument(
&format!("Passthrough action is not supported for destination {}, in method {method} and path {path}", key.connection_key),
None,
)),
Expand Down Expand Up @@ -577,7 +594,7 @@ impl UnifiedDestination {

// Template the route for passthrough actions
let templated_config = match &destination.action {
Action::Passthrough { method: _, path } => {
Action::Passthrough { path, .. } => {
let mut config_clone = (*config).clone();
let PlatformInfo::Api(ref mut c) = config_clone.platform_info;
let template = template_route(c.path.clone(), path.to_string());
Expand Down

0 comments on commit b243652

Please sign in to comment.