diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index c950cdc0f1..dbe114fd89 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -27,6 +27,7 @@ use crate::errors::IdDecodeError; use crate::id_util::IdDecoder; use crate::id_util::IdEncoder; use crate::id_util::IdResourceType; +use crate::invocation; use crate::invocation::{InvocationTarget, InvocationTargetType, WorkflowHandlerType}; use crate::time::MillisSinceEpoch; @@ -130,13 +131,19 @@ pub type EntryIndex = u32; /// which identifies a consecutive range of partition keys. pub type PartitionKey = u64; -/// Returns the partition key computed from either the service_key, or idempotency_key, if possible +/// Returns the partition key. The precedence is as follows: +/// +/// 1. If there is a service key, that's what we use to compute the key +/// 2. Otherwise, we use the concurrency key +/// 3. Otherwise, we use the idempotency key fn deterministic_partition_key( service_key: Option<&str>, + concurrency_key: Option<&str>, idempotency_key: Option<&str>, ) -> Option { service_key .map(partitioner::HashPartitioner::compute_partition_key) + .or_else(|| concurrency_key.map(partitioner::HashPartitioner::compute_partition_key)) .or_else(|| idempotency_key.map(partitioner::HashPartitioner::compute_partition_key)) } @@ -422,12 +429,19 @@ pub trait WithInvocationId { pub type EncodedInvocationId = [u8; InvocationId::SIZE_IN_BYTES]; impl InvocationId { - pub fn generate(invocation_target: &InvocationTarget, idempotency_key: Option<&str>) -> Self { + pub fn generate( + invocation_target: &InvocationTarget, + invocation_concurrency: Option<&invocation::Concurrency>, + idempotency_key: Option<&str>, + ) -> Self { // --- Partition key generation let partition_key = // Either try to generate the deterministic partition key, if possible deterministic_partition_key( - invocation_target.key().map(|bs| bs.as_ref()), + invocation_target.key() + .map(|bs| bs.as_ref()), + invocation_concurrency + .and_then(|c| c.inbox_key().map(|bs| bs.as_ref())), idempotency_key, ) // If no deterministic partition key can be generated, just pick a random number @@ -585,6 +599,8 @@ impl IdempotencyId { // * For services with key, the partition key is the hash(service key), this due to the virtual object locking requirement. let partition_key = deterministic_partition_key( service_key.as_ref().map(|bs| bs.as_ref()), + // This data structure won't need anymore the partition key anymore, by the time we enable the new custom concurrency feature. + None, Some(&idempotency_key), ) .expect("A deterministic partition key can always be generated for idempotency id"); @@ -975,7 +991,7 @@ mod mocks { impl InvocationId { pub fn mock_generate(invocation_target: &InvocationTarget) -> Self { - InvocationId::generate(invocation_target, None) + InvocationId::generate(invocation_target, None, None) } pub fn mock_random() -> Self { @@ -1132,11 +1148,28 @@ mod tests { let idempotent_key = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); assert_eq!( - InvocationId::generate(&invocation_target, Some(&idempotent_key)), - InvocationId::generate(&invocation_target, Some(&idempotent_key)) + InvocationId::generate(&invocation_target, None, Some(&idempotent_key)), + InvocationId::generate(&invocation_target, None, Some(&idempotent_key)) ); } + #[test] + fn deterministic_partition_key_for_service_request() { + let invocation_target = InvocationTarget::mock_service(); + let concurrency = invocation::Concurrency::Sequential { + inbox_target: invocation_target.service_name().clone(), + inbox_key: Alphanumeric + .sample_string(&mut rand::thread_rng(), 16) + .into(), + }; + + let id1 = InvocationId::generate(&invocation_target, Some(&concurrency), None); + let id2 = InvocationId::generate(&invocation_target, Some(&concurrency), None); + + assert_eq!(id1.partition_key(), id2.partition_key()); + assert_ne!(id1.invocation_uuid(), id2.invocation_uuid()); + } + #[test] fn deterministic_invocation_id_for_workflow_request() { let invocation_target = InvocationTarget::mock_workflow(); diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation.rs index 240760f7c7..84b00ae6ed 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation.rs @@ -273,6 +273,78 @@ impl fmt::Display for InvocationTarget { } } +/// Concurrency guarantee of the invocation request. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum Concurrency { + /// Invocation executes sequential wrt the inbox address (target + key) + Sequential { + /// fka ServiceId.name + inbox_target: ByteString, + /// fka ServiceId.key + inbox_key: ByteString, + }, + /// No queueing, just execute the request + Concurrent, +} + +impl Concurrency { + pub fn inbox_key(&self) -> Option<&ByteString> { + if let Concurrency::Sequential { inbox_key, .. } = self { + Some(inbox_key) + } else { + None + } + } + + pub fn infer_target_default(invocation_target: &InvocationTarget) -> Concurrency { + match invocation_target { + InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Exclusive, name, key, .. } => { + Concurrency::Sequential { + inbox_target: name.clone(), + inbox_key: key.clone(), + } + } + InvocationTarget::Service { .. } | + InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Shared, .. } | + /* For workflow, there is no enqueueing as we have the behavior on existing invocation id that guarantees correctness */ + InvocationTarget::Workflow { .. } => Concurrency::Concurrent + } + } +} + +/// Behavior when sending an invocation request and the invocation already exists. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum IfExists { + /// Attach to the existing invocation + Attach, + /// Reply with "conflict" error response + ReplyConflict, + /// Just drop the request + Drop, +} + +impl IfExists { + pub fn infer_target_default( + invocation_target_type: InvocationTargetType, + has_idempotency_key: bool, + ) -> IfExists { + match (invocation_target_type, has_idempotency_key) { + (InvocationTargetType::Workflow(WorkflowHandlerType::Workflow), _) => { + IfExists::ReplyConflict + } + (_, true) => IfExists::Attach, + _ => IfExists::Drop, + } + } +} + +/// Invocation request flow is as follows: +/// +/// 1. Invocation is proposed in the PP log. +/// 2. PP will first check if another invocation with the same id exists. If it exists, it applies the [`IfExists`], otherwise moves to point 3. +/// 3. If the invocation id doesn't exist, wait for the `execution_time` if present, otherwise continue immediately. +/// 4. Apply the given [`Concurrency`]. +/// 5. Finally execute it sending the request to the service endpoint. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct InvocationRequestHeader { pub id: InvocationId, @@ -280,12 +352,16 @@ pub struct InvocationRequestHeader { pub headers: Vec
, pub span_context: ServiceInvocationSpanContext, - /// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`]. - pub idempotency_key: Option, - + /// Behavior to apply on an existing invocation id. If not present, [`IfPresent::infer_target_default`] should be used. + pub if_exists: Option, /// Time when the request should be executed. If none, it's executed immediately. pub execution_time: Option, + /// Concurrency behavior to apply. If not present, [`Concurrency::infer_target_default`] should be used. + pub concurrency: Option, + /// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`]. + /// This value is propagated only for observability purposes, as the invocation id is already deterministic given the invocation id. + pub idempotency_key: Option, /// Retention duration of the completed status. If none, the completed status is not retained. pub completion_retention_duration: Option, } @@ -300,6 +376,8 @@ impl InvocationRequestHeader { idempotency_key: None, execution_time: None, completion_retention_duration: None, + if_exists: None, + concurrency: None, } } @@ -352,10 +430,13 @@ pub struct ServiceInvocation { pub source: Source, pub span_context: ServiceInvocationSpanContext, pub headers: Vec
, - /// Time when the request should be executed + pub idempotency_key: Option, + + pub if_exists: Option, pub execution_time: Option, + pub concurrency: Option, + pub completion_retention_duration: Option, - pub idempotency_key: Option, // Where to send the response, if any pub response_sink: Option, @@ -382,10 +463,12 @@ impl ServiceInvocation { span_context: request.header.span_context, headers: request.header.headers, execution_time: request.header.execution_time, + concurrency: request.header.concurrency, completion_retention_duration: request.header.completion_retention_duration, idempotency_key: request.header.idempotency_key, response_sink: None, submit_notification_sink: None, + if_exists: request.header.if_exists, } } @@ -403,9 +486,11 @@ impl ServiceInvocation { span_context: ServiceInvocationSpanContext::empty(), headers: vec![], execution_time: None, + concurrency: None, completion_retention_duration: None, idempotency_key: None, submit_notification_sink: None, + if_exists: None, } } @@ -884,6 +969,7 @@ impl InvocationQuery { handler_ty: WorkflowHandlerType::Workflow, }, None, + None, ), InvocationQuery::IdempotencyId(IdempotencyId { service_name, @@ -904,7 +990,7 @@ impl InvocationQuery { VirtualObjectHandlerType::Exclusive, ), }; - InvocationId::generate(&target, Some(idempotency_key.deref())) + InvocationId::generate(&target, None, Some(idempotency_key.deref())) } } } @@ -995,9 +1081,11 @@ mod mocks { span_context: Default::default(), headers: vec![], execution_time: None, + concurrency: None, completion_retention_duration: None, idempotency_key: None, submit_notification_sink: None, + if_exists: None, } } }