From 4121cc0700cf47a16f0861655f1906d810085fba Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Sun, 19 Jan 2025 21:01:29 -0500 Subject: [PATCH] move PauseClient into Runtime (#33309) PauseClient is really a global at the same level as Runtime -- based on the Runtime we might decide to advance time really quickly for tests, make randomness deterministic, start threads deterministically, or pause at specific labels to allow race conditions to be tested. I was motivated to do this because I was adding new race tests for QueryCache, and I found myself passing PauseClient through many layers of code that already have Runtime and also already have multiple PauseClients. This felt redundant, and I can clean up a lot of code by just passing through the runtime. One side effect is passing Runtime to a couple structs that don't already have it, like DatabaseSnapshot. GitOrigin-RevId: f99a8625946fc2356103b5b8e294aa31fc6e0007 --- crates/application/src/api.rs | 3 - .../src/application_function_runner/mod.rs | 10 +-- crates/application/src/deploy_config.rs | 2 - .../application/src/exports/export_storage.rs | 4 +- crates/application/src/exports/mod.rs | 2 +- crates/application/src/exports/worker.rs | 3 - crates/application/src/lib.rs | 29 ++------- crates/application/src/scheduled_jobs/mod.rs | 20 ++---- crates/application/src/schema_worker/mod.rs | 2 +- .../snapshot_import/import_file_storage.rs | 2 - crates/application/src/snapshot_import/mod.rs | 16 +---- .../src/snapshot_import/progress.rs | 2 - .../application/src/snapshot_import/tests.rs | 20 ++---- .../application/src/snapshot_import/worker.rs | 3 - crates/application/src/test_helpers.rs | 20 ------ crates/application/src/tests/mutation.rs | 49 ++++++--------- crates/application/src/tests/occ_retries.rs | 17 ++---- crates/application/src/tests/query_cache.rs | 26 +++----- .../src/tests/returns_validation.rs | 2 - .../application/src/tests/scheduled_jobs.rs | 46 ++++++++------ crates/async_lru/src/async_lru.rs | 61 +++++-------------- crates/common/src/runtime/mod.rs | 3 + crates/common/src/runtime/testing/mod.rs | 17 ++++++ crates/convex_macro/src/lib.rs | 32 ++++++++-- crates/database/src/committer.rs | 3 +- crates/database/src/database.rs | 61 ++++++++----------- crates/database/src/index_worker.rs | 2 +- .../src/index_workers/search_flusher.rs | 16 +---- crates/database/src/search_index_bootstrap.rs | 7 +-- crates/database/src/table_iteration.rs | 32 +++++----- crates/database/src/table_summary.rs | 50 ++++++++------- .../database/src/test_helpers/db_fixtures.rs | 3 +- crates/database/src/tests/mod.rs | 7 +-- .../src/tests/randomized_search_tests.rs | 10 +-- .../database/src/tests/vector_test_utils.rs | 6 +- crates/database/src/tests/vector_tests.rs | 5 +- .../database/src/text_index_worker/flusher.rs | 8 --- .../src/vector_index_worker/flusher.rs | 25 ++++---- .../function_runner/src/in_memory_indexes.rs | 4 +- .../src/in_process_function_runner.rs | 8 +-- crates/isolate/src/client.rs | 40 ++++++------ crates/isolate/src/isolate_worker.rs | 47 +++----------- crates/isolate/src/test_helpers.rs | 17 ++---- crates/local_backend/src/lib.rs | 4 -- .../src/node_action_callbacks.rs | 2 - crates/runtime/src/prod.rs | 5 ++ 46 files changed, 288 insertions(+), 465 deletions(-) diff --git a/crates/application/src/api.rs b/crates/application/src/api.rs index e6597ca27..bf0e0a470 100644 --- a/crates/application/src/api.rs +++ b/crates/application/src/api.rs @@ -14,7 +14,6 @@ use common::{ PublicFunctionPath, }, http::ResolvedHostname, - pause::PauseClient, runtime::Runtime, types::{ AllowedVisibility, @@ -355,7 +354,6 @@ impl ApplicationApi for Application { identity, mutation_identifier, caller, - PauseClient::new(), ) .await } @@ -381,7 +379,6 @@ impl ApplicationApi for Application { identity, mutation_identifier, caller, - PauseClient::new(), ) .await } diff --git a/crates/application/src/application_function_runner/mod.rs b/crates/application/src/application_function_runner/mod.rs index ee00f34fc..b5c6e13ca 100644 --- a/crates/application/src/application_function_runner/mod.rs +++ b/crates/application/src/application_function_runner/mod.rs @@ -52,7 +52,6 @@ use common::{ LogLines, }, minitrace_helpers::EncodedSpan, - pause::PauseClient, query_journal::QueryJournal, runtime::{ Runtime, @@ -724,7 +723,6 @@ impl ApplicationFunctionRunner { identity: Identity, mutation_identifier: Option, caller: FunctionCaller, - pause_client: PauseClient, ) -> anyhow::Result> { let timer = mutation_timer(); let result = self @@ -735,7 +733,6 @@ impl ApplicationFunctionRunner { identity, mutation_identifier, caller, - pause_client, ) .await; match &result { @@ -755,7 +752,6 @@ impl ApplicationFunctionRunner { identity: Identity, mutation_identifier: Option, caller: FunctionCaller, - pause_client: PauseClient, ) -> anyhow::Result> { if path.is_system() && !(identity.is_admin() || identity.is_system()) { anyhow::bail!(unauthorized_error("mutation")); @@ -788,6 +784,7 @@ impl ApplicationFunctionRunner { .database .begin_with_usage(identity.clone(), usage_tracker.clone()) .await?; + let pause_client = self.runtime.pause_client(); pause_client.wait("retry_mutation_loop_start").await; let identity = tx.inert_identity(); @@ -1888,7 +1885,6 @@ impl ActionCallbacks for ApplicationFunctionRunner { FunctionCaller::Action { parent_scheduled_job: context.parent_scheduled_job, }, - PauseClient::new(), ) .await .map(|r| match r { @@ -1969,7 +1965,6 @@ impl ActionCallbacks for ApplicationFunctionRunner { .execute_with_occ_retries( identity, FunctionUsageTracker::new(), - PauseClient::new(), "app_funrun_storage_store_file_entry", |tx| { async { @@ -2001,7 +1996,6 @@ impl ActionCallbacks for ApplicationFunctionRunner { .execute_with_occ_retries( identity, FunctionUsageTracker::new(), - PauseClient::new(), "app_funrun_storage_delete", |tx| { async { @@ -2032,7 +2026,6 @@ impl ActionCallbacks for ApplicationFunctionRunner { .execute_with_occ_retries( identity, FunctionUsageTracker::new(), - PauseClient::new(), "app_funrun_schedule_job", |tx| { let path = scheduled_path.clone(); @@ -2071,7 +2064,6 @@ impl ActionCallbacks for ApplicationFunctionRunner { .execute_with_occ_retries( identity, FunctionUsageTracker::new(), - PauseClient::new(), "app_funrun_cancel_job", |tx| { async { diff --git a/crates/application/src/deploy_config.rs b/crates/application/src/deploy_config.rs index 3299eb633..c1cf14eef 100644 --- a/crates/application/src/deploy_config.rs +++ b/crates/application/src/deploy_config.rs @@ -25,7 +25,6 @@ use common::{ Resource, }, errors::JsError, - pause::PauseClient, runtime::{ Runtime, UnixTimestamp, @@ -243,7 +242,6 @@ impl Application { .execute_with_occ_retries( Identity::system(), FunctionUsageTracker::new(), - PauseClient::new(), WriteSource::new("start_push"), |tx| { async move { diff --git a/crates/application/src/exports/export_storage.rs b/crates/application/src/exports/export_storage.rs index 12e560017..39a327126 100644 --- a/crates/application/src/exports/export_storage.rs +++ b/crates/application/src/exports/export_storage.rs @@ -71,7 +71,7 @@ pub async fn write_storage_table<'a, 'b: 'a, RT: Runtime>( let mut table_upload = zip_snapshot_upload .start_system_table(path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone()) .await?; - let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None); + let table_iterator = worker.database.table_iterator(snapshot_ts, 1000); let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); pin_mut!(stream); while let Some((doc, _ts)) = stream.try_next().await? { @@ -95,7 +95,7 @@ pub async fn write_storage_table<'a, 'b: 'a, RT: Runtime>( } table_upload.complete().await?; - let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None); + let table_iterator = worker.database.table_iterator(snapshot_ts, 1000); let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); pin_mut!(stream); while let Some((doc, _ts)) = stream.try_next().await? { diff --git a/crates/application/src/exports/mod.rs b/crates/application/src/exports/mod.rs index 5dff7c6c5..3dfaba1dc 100644 --- a/crates/application/src/exports/mod.rs +++ b/crates/application/src/exports/mod.rs @@ -199,7 +199,7 @@ pub async fn write_table<'a, 'b: 'a, RT: Runtime>( .start_table(path_prefix, table_name.clone()) .await?; - let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None); + let table_iterator = worker.database.table_iterator(snapshot_ts, 1000); let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); pin_mut!(stream); diff --git a/crates/application/src/exports/worker.rs b/crates/application/src/exports/worker.rs index 2a863fdbf..5ca4cd381 100644 --- a/crates/application/src/exports/worker.rs +++ b/crates/application/src/exports/worker.rs @@ -11,7 +11,6 @@ use common::{ document::ParsedDocument, errors::report_error, execution_context::ExecutionId, - pause::PauseClient, runtime::Runtime, types::UdfIdentifier, RequestId, @@ -208,7 +207,6 @@ impl ExportWorker { .execute_with_occ_retries( Identity::system(), FunctionUsageTracker::new(), - PauseClient::new(), "export_worker_update_progress", move |tx| { let msg = msg.clone(); @@ -272,7 +270,6 @@ impl ExportWorker { .execute_with_occ_retries( Identity::system(), FunctionUsageTracker::new(), - PauseClient::new(), "export_worker_mark_complete", |tx| { let object_key = object_key.clone(); diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index b8f5160db..c24a68a80 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -66,7 +66,6 @@ use common::{ log_lines::LogLines, log_streaming::LogSender, paths::FieldPath, - pause::PauseClient, persistence::Persistence, query_journal::QueryJournal, runtime::{ @@ -554,8 +553,6 @@ impl Application { node_actions: Actions, log_sender: Arc, log_visibility: Arc>, - snapshot_import_pause_client: PauseClient, - scheduled_jobs_pause_client: PauseClient, app_auth: Arc, cache: QueryCache, ) -> anyhow::Result { @@ -588,9 +585,8 @@ impl Application { segment_term_metadata_fetcher, ); let search_worker = Arc::new(Mutex::new(search_worker)); - let search_and_vector_bootstrap_worker = Arc::new(Mutex::new( - database.start_search_and_vector_bootstrap(PauseClient::new()), - )); + let search_and_vector_bootstrap_worker = + Arc::new(Mutex::new(database.start_search_and_vector_bootstrap())); let table_summary_worker = TableSummaryWorker::start(runtime.clone(), database.clone(), persistence.clone()); let schema_worker = Arc::new(Mutex::new(runtime.spawn( @@ -633,7 +629,6 @@ impl Application { database.clone(), runner.clone(), function_log.clone(), - scheduled_jobs_pause_client, ); let cron_job_executor_fut = CronJobExecutor::start( @@ -663,7 +658,6 @@ impl Application { snapshot_imports_storage.clone(), file_storage.clone(), database.usage_counter().clone(), - snapshot_import_pause_client, ); let snapshot_import_worker = Arc::new(Mutex::new( runtime.spawn("snapshot_import_worker", snapshot_import_worker), @@ -996,7 +990,6 @@ impl Application { // Identifier used to make this mutation idempotent. mutation_identifier: Option, caller: FunctionCaller, - pause_client: PauseClient, ) -> anyhow::Result> { identity.ensure_can_run_function(UdfType::Mutation)?; let block_logging = self @@ -1016,7 +1009,6 @@ impl Application { identity, mutation_identifier, caller, - pause_client, ) .await { @@ -1258,7 +1250,6 @@ impl Application { identity, None, caller, - PauseClient::new(), ) .await .map(|res| { @@ -2817,7 +2808,6 @@ impl Application { { self.execute_with_audit_log_events_and_occ_retries_with_pause_client( identity, - PauseClient::new(), write_source, f, ) @@ -2841,7 +2831,6 @@ impl Application { { self.execute_with_audit_log_events_and_occ_retries_with_pause_client( identity, - PauseClient::new(), write_source, f, ) @@ -2851,7 +2840,6 @@ impl Application { pub async fn execute_with_audit_log_events_and_occ_retries_with_pause_client<'a, F, T>( &self, identity: Identity, - pause_client: PauseClient, write_source: impl Into, f: F, ) -> anyhow::Result<(T, OccRetryStats)> @@ -2865,13 +2853,9 @@ impl Application { { let db = self.database.clone(); let (ts, (t, events), stats) = db - .execute_with_occ_retries( - identity, - FunctionUsageTracker::new(), - pause_client, - write_source, - |tx| Self::insert_deployment_audit_log_events(tx, &f).into(), - ) + .execute_with_occ_retries(identity, FunctionUsageTracker::new(), write_source, |tx| { + Self::insert_deployment_audit_log_events(tx, &f).into() + }) .await?; // Send deployment audit logs // TODO CX-5139 Remove this when audit logs are being processed in LogManager. @@ -2890,7 +2874,6 @@ impl Application { &'a self, identity: Identity, usage: FunctionUsageTracker, - pause_client: PauseClient, write_source: impl Into, f: F, ) -> anyhow::Result<(Timestamp, T)> @@ -2900,7 +2883,7 @@ impl Application { F: for<'b> Fn(&'b mut Transaction) -> ShortBoxFuture<'b, 'a, anyhow::Result>, { self.database - .execute_with_occ_retries(identity, usage, pause_client, write_source, f) + .execute_with_occ_retries(identity, usage, write_source, f) .await .map(|(ts, t, _)| (ts, t)) } diff --git a/crates/application/src/scheduled_jobs/mod.rs b/crates/application/src/scheduled_jobs/mod.rs index c2804d1a6..0b4ae9fbb 100644 --- a/crates/application/src/scheduled_jobs/mod.rs +++ b/crates/application/src/scheduled_jobs/mod.rs @@ -33,10 +33,7 @@ use common::{ UDF_EXECUTOR_OCC_MAX_RETRIES, }, minitrace_helpers::get_sampled_span, - pause::{ - Fault, - PauseClient, - }, + pause::Fault, query::{ IndexRange, IndexRangeExpression, @@ -118,7 +115,6 @@ impl ScheduledJobRunner { database: Database, runner: Arc>, function_log: FunctionExecutionLog, - pause_client: PauseClient, ) -> Self { let executor_fut = ScheduledJobExecutor::start( rt.clone(), @@ -126,7 +122,6 @@ impl ScheduledJobRunner { database.clone(), runner, function_log, - pause_client, ); let executor = Arc::new(Mutex::new(rt.spawn("scheduled_job_executor", executor_fut))); @@ -148,7 +143,6 @@ impl ScheduledJobRunner { pub struct ScheduledJobExecutor { context: ScheduledJobContext, - pause_client: PauseClient, } impl Deref for ScheduledJobExecutor { @@ -166,7 +160,6 @@ pub struct ScheduledJobContext { database: Database, runner: Arc>, function_log: FunctionExecutionLog, - pause_client: PauseClient, } /// This roughly matches tokio's permits that it uses as part of cooperative @@ -182,7 +175,6 @@ impl ScheduledJobExecutor { database: Database, runner: Arc>, function_log: FunctionExecutionLog, - pause_client: PauseClient, ) -> impl Future + Send { let mut executor = Self { context: ScheduledJobContext { @@ -191,9 +183,7 @@ impl ScheduledJobExecutor { database, runner, function_log, - pause_client: pause_client.clone(), }, - pause_client, }; async move { let mut backoff = @@ -222,9 +212,7 @@ impl ScheduledJobExecutor { database, runner, function_log, - pause_client: PauseClient::new(), }, - pause_client: PauseClient::new(), } } @@ -245,6 +233,7 @@ impl ScheduledJobExecutor { async fn run(&mut self, backoff: &mut Backoff) -> anyhow::Result<()> { tracing::info!("Starting scheduled job executor"); + let pause_client = self.context.rt.pause_client(); let (job_finished_tx, mut job_finished_rx) = mpsc::channel(*SCHEDULED_JOB_EXECUTION_PARALLELISM); let mut running_job_ids = HashSet::new(); @@ -293,7 +282,7 @@ impl ScheduledJobExecutor { select_biased! { job_id = job_finished_rx.recv().fuse() => { if let Some(job_id) = job_id { - self.pause_client.wait(SCHEDULED_JOB_EXECUTED).await; + pause_client.wait(SCHEDULED_JOB_EXECUTED).await; running_job_ids.remove(&job_id); } else { anyhow::bail!("Job results channel closed, this is unexpected!"); @@ -645,6 +634,7 @@ impl ScheduledJobContext { let identity = tx.inert_identity(); let namespace = tx.table_mapping().tablet_namespace(job_id.tablet_id)?; let path = job.path.clone(); + let pause_client = self.rt.pause_client(); let udf_args = job.udf_args()?; let result = self @@ -674,7 +664,7 @@ impl ScheduledJobContext { SchedulerModel::new(&mut tx, namespace) .complete(job_id, ScheduledJobState::Success) .await?; - if let Fault::Error(e) = self.pause_client.wait(SCHEDULED_JOB_COMMITTING).await { + if let Fault::Error(e) = pause_client.wait(SCHEDULED_JOB_COMMITTING).await { tracing::info!("Injected error before committing mutation"); return Err(e); }; diff --git a/crates/application/src/schema_worker/mod.rs b/crates/application/src/schema_worker/mod.rs index 0e95b3fc4..41638bb14 100644 --- a/crates/application/src/schema_worker/mod.rs +++ b/crates/application/src/schema_worker/mod.rs @@ -160,7 +160,7 @@ impl SchemaWorker { )?; for table_name in tables_to_check { - let table_iterator = self.database.table_iterator(ts, 1000, None); + let table_iterator = self.database.table_iterator(ts, 1000); let tablet_id = table_mapping.name_to_tablet()(table_name.clone())?; let stream = table_iterator.stream_documents_in_table( tablet_id, diff --git a/crates/application/src/snapshot_import/import_file_storage.rs b/crates/application/src/snapshot_import/import_file_storage.rs index 7390a0b58..eca03394d 100644 --- a/crates/application/src/snapshot_import/import_file_storage.rs +++ b/crates/application/src/snapshot_import/import_file_storage.rs @@ -16,7 +16,6 @@ use common::{ PeekableExt, TryPeekableExt, }, - pause::PauseClient, runtime::Runtime, types::StorageUuid, }; @@ -185,7 +184,6 @@ pub async fn import_storage_table( .execute_with_overloaded_retries( identity.clone(), FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_storage_table", |tx| { async { diff --git a/crates/application/src/snapshot_import/mod.rs b/crates/application/src/snapshot_import/mod.rs index 088144407..b5f54d017 100644 --- a/crates/application/src/snapshot_import/mod.rs +++ b/crates/application/src/snapshot_import/mod.rs @@ -33,7 +33,6 @@ use common::{ TRANSACTION_MAX_NUM_USER_WRITES, TRANSACTION_MAX_USER_WRITE_SIZE_BYTES, }, - pause::PauseClient, runtime::Runtime, types::{ FullyQualifiedObjectKey, @@ -174,7 +173,6 @@ struct SnapshotImportExecutor { file_storage: FileStorage, usage_tracking: UsageCounter, backoff: Backoff, - pause_client: PauseClient, } impl SnapshotImportExecutor { @@ -192,7 +190,6 @@ impl SnapshotImportExecutor { .execute_with_overloaded_retries( Identity::system(), FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_waiting_for_confirmation", |tx| { async { @@ -220,7 +217,6 @@ impl SnapshotImportExecutor { .execute_with_overloaded_retries( Identity::system(), FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_fail", |tx| { async { @@ -257,7 +253,6 @@ impl SnapshotImportExecutor { .execute_with_overloaded_retries( Identity::system(), FunctionUsageTracker::new(), - PauseClient::new(), "snapshop_import_complete", |tx| { async { @@ -280,7 +275,6 @@ impl SnapshotImportExecutor { .execute_with_overloaded_retries( Identity::system(), FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_fail", |tx| { async { @@ -365,7 +359,8 @@ impl SnapshotImportExecutor { object_attributes.size, ); - self.pause_client.wait("before_finalize_import").await; + let pause_client = self.runtime.pause_client(); + pause_client.wait("before_finalize_import").await; let (ts, _documents_deleted) = finalize_import( &self.database, &self.usage_tracking, @@ -447,7 +442,6 @@ pub async fn start_stored_import( .execute_with_overloaded_retries( identity, FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_store_uploaded", |tx| { async { @@ -482,7 +476,6 @@ pub async fn perform_import( .execute_with_overloaded_retries( identity, FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_perform", |tx| { async { @@ -515,7 +508,6 @@ pub async fn cancel_import( .execute_with_overloaded_retries( identity, FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_cancel", |tx| { async { @@ -876,7 +868,6 @@ async fn finalize_import( .execute_with_overloaded_retries( identity, FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_finalize", |tx| { async { @@ -1263,7 +1254,6 @@ async fn insert_import_objects( .execute_with_overloaded_retries( identity.clone(), usage, - PauseClient::new(), "snapshot_import_insert_objects", |tx| { async { @@ -1368,7 +1358,6 @@ async fn prepare_table_for_import( .execute_with_overloaded_retries( identity.clone(), FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_prepare_table", |tx| { async { @@ -1438,7 +1427,6 @@ async fn backfill_and_enable_indexes_on_table( .execute_with_overloaded_retries( identity.clone(), FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_enable_indexes", |tx| { async { diff --git a/crates/application/src/snapshot_import/progress.rs b/crates/application/src/snapshot_import/progress.rs index 9a9ce1690..8630209da 100644 --- a/crates/application/src/snapshot_import/progress.rs +++ b/crates/application/src/snapshot_import/progress.rs @@ -1,6 +1,5 @@ use common::{ components::ComponentPath, - pause::PauseClient, runtime::Runtime, types::TableName, }; @@ -52,7 +51,6 @@ pub async fn add_checkpoint_message( .execute_with_overloaded_retries( identity.clone(), FunctionUsageTracker::new(), - PauseClient::new(), "snapshot_import_add_checkpoint_message", |tx| { async { diff --git a/crates/application/src/snapshot_import/tests.rs b/crates/application/src/snapshot_import/tests.rs index c5fb80c8d..84d3ad7bc 100644 --- a/crates/application/src/snapshot_import/tests.rs +++ b/crates/application/src/snapshot_import/tests.rs @@ -109,10 +109,7 @@ use crate::{ ImportFormat, ImportMode, }, - test_helpers::{ - ApplicationFixtureArgs, - ApplicationTestExt, - }, + test_helpers::ApplicationTestExt, Application, }; @@ -470,16 +467,11 @@ Interrupting `npx convex import` will not cancel it."# // Hard to control timing in race test with background job moving state forward. #[convex_macro::test_runtime] -async fn import_races_with_schema_update(rt: TestRuntime) -> anyhow::Result<()> { - let (pause_controller, pause_client) = PauseController::new(); - let app = Application::new_for_tests_with_args( - &rt, - ApplicationFixtureArgs { - snapshot_import_pause_client: Some(pause_client), - ..Default::default() - }, - ) - .await?; +async fn import_races_with_schema_update( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { + let app = Application::new_for_tests(&rt).await?; let table_name = "table1"; let test_csv = r#" a diff --git a/crates/application/src/snapshot_import/worker.rs b/crates/application/src/snapshot_import/worker.rs index 294d048b1..4c14da252 100644 --- a/crates/application/src/snapshot_import/worker.rs +++ b/crates/application/src/snapshot_import/worker.rs @@ -7,7 +7,6 @@ use std::{ use common::{ backoff::Backoff, errors::report_error, - pause::PauseClient, runtime::Runtime, }; use database::Database; @@ -40,7 +39,6 @@ impl SnapshotImportWorker { snapshot_imports_storage: Arc, file_storage: FileStorage, usage_tracking: UsageCounter, - pause_client: PauseClient, ) -> impl Future + Send { let mut worker = SnapshotImportExecutor { runtime, @@ -48,7 +46,6 @@ impl SnapshotImportWorker { snapshot_imports_storage, file_storage, usage_tracking, - pause_client, backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF), }; async move { diff --git a/crates/application/src/test_helpers.rs b/crates/application/src/test_helpers.rs index d40e003d7..4e6d45148 100644 --- a/crates/application/src/test_helpers.rs +++ b/crates/application/src/test_helpers.rs @@ -27,10 +27,6 @@ use common::{ UDF_CACHE_MAX_SIZE, }, log_streaming::NoopLogSender, - pause::{ - PauseClient, - PauseController, - }, persistence::Persistence, runtime::Runtime, testing::TestPersistence, @@ -123,22 +119,10 @@ pub static OBJECTS_TABLE_COMPONENT: ComponentId = ComponentId::test_user(); #[derive(Default)] pub struct ApplicationFixtureArgs { pub tp: Option, - pub snapshot_import_pause_client: Option, - pub scheduled_jobs_pause_client: PauseClient, - pub function_runner_pause_client: PauseClient, pub event_logger: Option>, } impl ApplicationFixtureArgs { - pub fn with_scheduled_jobs_pause_client() -> (Self, PauseController) { - let (pause_controller, pause_client) = PauseController::new(); - let args = ApplicationFixtureArgs { - scheduled_jobs_pause_client: pause_client, - ..Default::default() - }; - (args, pause_controller) - } - pub fn with_event_logger(event_logger: Arc) -> Self { Self { event_logger: Some(event_logger), @@ -199,7 +183,6 @@ impl ApplicationTestExt for Application { let searcher = Arc::new(search::searcher::SearcherStub {}); let segment_term_metadata_fetcher = Arc::new(search::searcher::SearcherStub {}); let persistence = args.tp.unwrap_or_else(TestPersistence::new); - let snapshot_import_pause_client = args.snapshot_import_pause_client.unwrap_or_default(); let database = Database::load( Arc::new(persistence.clone()), rt.clone(), @@ -250,7 +233,6 @@ impl ApplicationTestExt for Application { }, database.clone(), fetch_client, - args.function_runner_pause_client, ) .await?, ); @@ -294,8 +276,6 @@ impl ApplicationTestExt for Application { actions, Arc::new(NoopLogSender), Arc::new(AllowLogging), - snapshot_import_pause_client, - args.scheduled_jobs_pause_client, Arc::new(ApplicationAuth::new( kb.clone(), Arc::new(NullAccessTokenAuth), diff --git a/crates/application/src/tests/mutation.rs b/crates/application/src/tests/mutation.rs index 9e01b90fc..a32f722e6 100644 --- a/crates/application/src/tests/mutation.rs +++ b/crates/application/src/tests/mutation.rs @@ -8,10 +8,7 @@ use common::{ PublicFunctionPath, }, knobs::UDF_EXECUTOR_OCC_MAX_RETRIES, - pause::{ - PauseClient, - PauseController, - }, + pause::PauseController, types::FunctionCaller, RequestId, }; @@ -38,10 +35,7 @@ use crate::{ Application, }; -async fn insert_object( - application: &Application, - pause_client: PauseClient, -) -> anyhow::Result { +async fn insert_object(application: &Application) -> anyhow::Result { let obj = json!({"an": "object"}); let result = application .mutation_udf( @@ -56,16 +50,12 @@ async fn insert_object( FunctionCaller::Action { parent_scheduled_job: None, }, - pause_client, ) .await??; Ok(JsonValue::from(result.value)) } -async fn insert_and_count( - application: &Application, - pause_client: PauseClient, -) -> anyhow::Result { +async fn insert_and_count(application: &Application) -> anyhow::Result { let obj = json!({"an": "object"}); let result = application .mutation_udf( @@ -80,7 +70,6 @@ async fn insert_and_count( FunctionCaller::Action { parent_scheduled_job: None, }, - pause_client, ) .await??; Ok(JsonValue::from(result.value) @@ -92,13 +81,13 @@ async fn insert_and_count( async fn test_mutation(rt: TestRuntime) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; - let result = insert_object(&application, PauseClient::new()).await?; + let result = insert_object(&application).await?; assert_eq!(result["an"], "object"); Ok(()) } #[convex_macro::test_runtime] -async fn test_mutation_occ_fail(rt: TestRuntime) -> anyhow::Result<()> { +async fn test_mutation_occ_fail(rt: TestRuntime, pause: PauseController) -> anyhow::Result<()> { let logger = BasicTestUsageEventLogger::new(); let application = Application::new_for_tests_with_args( &rt, @@ -107,9 +96,8 @@ async fn test_mutation_occ_fail(rt: TestRuntime) -> anyhow::Result<()> { .await?; application.load_udf_tests_modules().await?; - let (pause, pause_client) = PauseController::new(); let hold_guard = pause.hold("retry_mutation_loop_start"); - let fut1 = insert_and_count(&application, pause_client); + let fut1 = insert_and_count(&application); let fut2 = async { let mut hold_guard = hold_guard; for i in 0..*UDF_EXECUTOR_OCC_MAX_RETRIES + 1 { @@ -117,13 +105,13 @@ async fn test_mutation_occ_fail(rt: TestRuntime) -> anyhow::Result<()> { .wait_for_blocked() .await .context("Didn't hit breakpoint?")?; - hold_guard = pause.hold("retry_mutation_loop_start"); // Do an entire mutation while we're paused - to create an OCC conflict on // the original insertion. - let count = insert_and_count(&application, PauseClient::new()).await?; + let count = insert_and_count(&application).await?; assert_eq!(count, i + 1); + hold_guard = pause.hold("retry_mutation_loop_start"); guard.unpause(); } Ok::<_, anyhow::Error>(()) @@ -180,7 +168,7 @@ async fn test_mutation_occ_fail(rt: TestRuntime) -> anyhow::Result<()> { } #[convex_macro::test_runtime] -async fn test_mutation_occ_success(rt: TestRuntime) -> anyhow::Result<()> { +async fn test_mutation_occ_success(rt: TestRuntime, pause: PauseController) -> anyhow::Result<()> { let logger = BasicTestUsageEventLogger::new(); let application = Application::new_for_tests_with_args( &rt, @@ -189,9 +177,8 @@ async fn test_mutation_occ_success(rt: TestRuntime) -> anyhow::Result<()> { .await?; application.load_udf_tests_modules().await?; - let (pause, pause_client) = PauseController::new(); let hold_guard = pause.hold("retry_mutation_loop_start"); - let fut1 = insert_and_count(&application, pause_client); + let fut1 = insert_and_count(&application); let fut2 = async { let mut hold_guard = hold_guard; for i in 0..*UDF_EXECUTOR_OCC_MAX_RETRIES + 1 { @@ -199,16 +186,16 @@ async fn test_mutation_occ_success(rt: TestRuntime) -> anyhow::Result<()> { .wait_for_blocked() .await .context("Didn't hit breakpoint?")?; - hold_guard = pause.hold("retry_mutation_loop_start"); // N-1 retries, Nth one allow it to succeed if i < *UDF_EXECUTOR_OCC_MAX_RETRIES { // Do an entire mutation while we're paused - to create an OCC conflict on // the original insertion. - let count = insert_and_count(&application, PauseClient::new()).await?; + let count = insert_and_count(&application).await?; assert_eq!(count, i + 1); } + hold_guard = pause.hold("retry_mutation_loop_start"); guard.unpause(); } Ok::<_, anyhow::Error>(()) @@ -267,16 +254,18 @@ async fn test_mutation_occ_success(rt: TestRuntime) -> anyhow::Result<()> { } #[convex_macro::test_runtime] -async fn test_multiple_inserts_dont_occ(rt: TestRuntime) -> anyhow::Result<()> { +async fn test_multiple_inserts_dont_occ( + rt: TestRuntime, + pause: PauseController, +) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; // Insert an object to create the table (otherwise it'll OCC on table creation). - insert_object(&application, PauseClient::new()).await?; + insert_object(&application).await?; - let (pause, pause_client) = PauseController::new(); let hold_guard = pause.hold("retry_mutation_loop_start"); - let fut1 = insert_object(&application, pause_client); + let fut1 = insert_object(&application); let fut2 = async { let guard = hold_guard .wait_for_blocked() @@ -285,7 +274,7 @@ async fn test_multiple_inserts_dont_occ(rt: TestRuntime) -> anyhow::Result<()> { // Do several entire mutations while we're paused. Shouldn't OCC. for _ in 0..5 { - let result = insert_object(&application, PauseClient::new()).await?; + let result = insert_object(&application).await?; assert_eq!(result["an"], "object"); } diff --git a/crates/application/src/tests/occ_retries.rs b/crates/application/src/tests/occ_retries.rs index c01306836..0b0636a53 100644 --- a/crates/application/src/tests/occ_retries.rs +++ b/crates/application/src/tests/occ_retries.rs @@ -1,8 +1,5 @@ use anyhow::Context; -use common::pause::{ - PauseClient, - PauseController, -}; +use common::pause::PauseController; use database::{ SystemMetadataModel, Transaction, @@ -39,14 +36,12 @@ async fn test_replace_tx( async fn test_replace_with_retries( application: &Application, - pause_client: PauseClient, id: ResolvedDocumentId, value: ConvexValue, ) -> anyhow::Result<()> { application .execute_with_audit_log_events_and_occ_retries_with_pause_client( Identity::system(), - pause_client, "test", move |tx| test_replace_tx(tx, id, value.clone()).into(), ) @@ -55,7 +50,7 @@ async fn test_replace_with_retries( } #[convex_macro::test_runtime] -async fn test_occ_fails(rt: TestRuntime) -> anyhow::Result<()> { +async fn test_occ_fails(rt: TestRuntime, pause: PauseController) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; let identity = Identity::system(); let mut tx = application.begin(identity.clone()).await?; @@ -67,9 +62,8 @@ async fn test_occ_fails(rt: TestRuntime) -> anyhow::Result<()> { .await?; application.commit_test(tx).await?; - let (pause, pause_client) = PauseController::new(); let hold_guard = pause.hold("retry_tx_loop_start"); - let fut1 = test_replace_with_retries(&application, pause_client, id, "value".try_into()?); + let fut1 = test_replace_with_retries(&application, id, "value".try_into()?); let fut2 = async { let mut hold_guard = hold_guard; @@ -92,7 +86,7 @@ async fn test_occ_fails(rt: TestRuntime) -> anyhow::Result<()> { } #[convex_macro::test_runtime] -async fn test_occ_succeeds(rt: TestRuntime) -> anyhow::Result<()> { +async fn test_occ_succeeds(rt: TestRuntime, pause: PauseController) -> anyhow::Result<()> { let application = Application::new_for_tests(&rt).await?; let identity = Identity::system(); let mut tx = application.begin(identity.clone()).await?; @@ -104,9 +98,8 @@ async fn test_occ_succeeds(rt: TestRuntime) -> anyhow::Result<()> { .await?; application.commit_test(tx).await?; - let (pause, pause_client) = PauseController::new(); let hold_guard = pause.hold("retry_tx_loop_start"); - let fut1 = test_replace_with_retries(&application, pause_client, id, "value".try_into()?); + let fut1 = test_replace_with_retries(&application, id, "value".try_into()?); let fut2 = async { let mut hold_guard = hold_guard; diff --git a/crates/application/src/tests/query_cache.rs b/crates/application/src/tests/query_cache.rs index 32e7d4073..5958e5a3d 100644 --- a/crates/application/src/tests/query_cache.rs +++ b/crates/application/src/tests/query_cache.rs @@ -6,10 +6,7 @@ use common::{ ComponentPath, PublicFunctionPath, }, - pause::{ - PauseClient, - PauseController, - }, + pause::PauseController, types::FunctionCaller, RequestId, }; @@ -27,10 +24,7 @@ use serde_json::{ use value::ConvexValue; use crate::{ - test_helpers::{ - ApplicationFixtureArgs, - ApplicationTestExt, - }, + test_helpers::ApplicationTestExt, Application, }; @@ -75,7 +69,6 @@ async fn insert_object(application: &Application) -> anyhow::Result FunctionCaller::Action { parent_scheduled_job: None, }, - PauseClient::new(), ) .await??; Ok(result.value) @@ -323,16 +316,11 @@ async fn test_query_cache_without_checking_auth(rt: TestRuntime) -> anyhow::Resu } #[convex_macro::test_runtime] -async fn test_query_cache_unauthed_race(rt: TestRuntime) -> anyhow::Result<()> { - let (pause_controller, pause_client) = PauseController::new(); - let application = Application::new_for_tests_with_args( - &rt, - ApplicationFixtureArgs { - function_runner_pause_client: pause_client, - ..Default::default() - }, - ) - .await?; +async fn test_query_cache_unauthed_race( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { + let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; // Run the same query as different users, in parallel. diff --git a/crates/application/src/tests/returns_validation.rs b/crates/application/src/tests/returns_validation.rs index 59a106d31..d97fb6a3d 100644 --- a/crates/application/src/tests/returns_validation.rs +++ b/crates/application/src/tests/returns_validation.rs @@ -4,7 +4,6 @@ use common::{ ComponentPath, PublicFunctionPath, }, - pause::PauseClient, types::FunctionCaller, RequestId, }; @@ -42,7 +41,6 @@ async fn run_zero_arg_mutation( Identity::user(UserIdentity::test()), None, FunctionCaller::HttpEndpoint, - PauseClient::new(), ) .await } diff --git a/crates/application/src/tests/scheduled_jobs.rs b/crates/application/src/tests/scheduled_jobs.rs index b4eed2495..87ae9af91 100644 --- a/crates/application/src/tests/scheduled_jobs.rs +++ b/crates/application/src/tests/scheduled_jobs.rs @@ -12,7 +12,7 @@ use common::{ execution_context::ExecutionContext, pause::{ HoldGuard, - PauseClient, + PauseController, }, runtime::Runtime, types::FunctionCaller, @@ -50,7 +50,6 @@ use crate::{ SCHEDULED_JOB_EXECUTED, }, test_helpers::{ - ApplicationFixtureArgs, ApplicationTestExt, OBJECTS_TABLE, OBJECTS_TABLE_COMPONENT, @@ -99,9 +98,11 @@ async fn wait_for_scheduled_job_execution(hold_guard: HoldGuard) { } #[convex_macro::test_runtime] -async fn test_scheduled_jobs_success(rt: TestRuntime) -> anyhow::Result<()> { - let (args, pause_controller) = ApplicationFixtureArgs::with_scheduled_jobs_pause_client(); - let application = Application::new_for_tests_with_args(&rt, args).await?; +async fn test_scheduled_jobs_success( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { + let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let hold_guard = pause_controller.hold(SCHEDULED_JOB_EXECUTED); @@ -181,10 +182,12 @@ async fn test_scheduled_jobs_race_condition(rt: TestRuntime) -> anyhow::Result<( } #[convex_macro::test_runtime] -async fn test_scheduled_jobs_garbage_collection(rt: TestRuntime) -> anyhow::Result<()> { +async fn test_scheduled_jobs_garbage_collection( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { std::env::set_var("SCHEDULED_JOB_RETENTION", "30"); - let (args, pause_controller) = ApplicationFixtureArgs::with_scheduled_jobs_pause_client(); - let application = Application::new_for_tests_with_args(&rt, args).await?; + let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let hold_guard = pause_controller.hold(SCHEDULED_JOB_EXECUTED); @@ -223,15 +226,21 @@ async fn test_scheduled_jobs_garbage_collection(rt: TestRuntime) -> anyhow::Resu } #[convex_macro::test_runtime] -async fn test_pause_scheduled_jobs(rt: TestRuntime) -> anyhow::Result<()> { - test_scheduled_jobs_helper(rt, BackendState::Paused).await?; +async fn test_pause_scheduled_jobs( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { + test_scheduled_jobs_helper(rt, BackendState::Paused, pause_controller).await?; Ok(()) } #[convex_macro::test_runtime] -async fn test_disable_scheduled_jobs(rt: TestRuntime) -> anyhow::Result<()> { - test_scheduled_jobs_helper(rt, BackendState::Disabled).await?; +async fn test_disable_scheduled_jobs( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { + test_scheduled_jobs_helper(rt, BackendState::Disabled, pause_controller).await?; Ok(()) } @@ -240,9 +249,9 @@ async fn test_disable_scheduled_jobs(rt: TestRuntime) -> anyhow::Result<()> { async fn test_scheduled_jobs_helper( rt: TestRuntime, backend_state: BackendState, + pause_controller: PauseController, ) -> anyhow::Result<()> { - let (args, pause_controller) = ApplicationFixtureArgs::with_scheduled_jobs_pause_client(); - let application = Application::new_for_tests_with_args(&rt, args).await?; + let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let scheduled_job_executed_hold_guard = pause_controller.hold(SCHEDULED_JOB_EXECUTED); @@ -315,7 +324,6 @@ async fn test_cancel_recursively_scheduled_job(rt: TestRuntime) -> anyhow::Resul FunctionCaller::Action { parent_scheduled_job, }, - PauseClient::new(), ) .await??; @@ -346,9 +354,11 @@ async fn test_cancel_recursively_scheduled_job(rt: TestRuntime) -> anyhow::Resul } #[convex_macro::test_runtime] -async fn test_scheduled_job_retry(rt: TestRuntime) -> anyhow::Result<()> { - let (args, pause_controller) = ApplicationFixtureArgs::with_scheduled_jobs_pause_client(); - let application = Application::new_for_tests_with_args(&rt, args).await?; +async fn test_scheduled_job_retry( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { + let application = Application::new_for_tests(&rt).await?; application.load_udf_tests_modules().await?; let attempt_commit = pause_controller.hold(SCHEDULED_JOB_COMMITTING); diff --git a/crates/async_lru/src/async_lru.rs b/crates/async_lru/src/async_lru.rs index 9c937f26a..9441abf13 100644 --- a/crates/async_lru/src/async_lru.rs +++ b/crates/async_lru/src/async_lru.rs @@ -10,8 +10,6 @@ use std::{ use ::metrics::StatusTimer; use async_broadcast::Receiver as BroadcastReceiver; -#[cfg(test)] -use common::pause::PauseClient; use common::{ codel_queue::{ new_codel_queue_async, @@ -56,7 +54,6 @@ use crate::metrics::{ log_async_lru_size, }; -#[cfg(test)] const PAUSE_DURING_GENERATE_VALUE_LABEL: &str = "generate_value"; /// A write through cache with support for cancelation. @@ -76,15 +73,10 @@ const PAUSE_DURING_GENERATE_VALUE_LABEL: &str = "generate_value"; /// cost is that we have to spawn more value calculating threads and that the /// desired concurrency of the cache may not match that of the caller. pub struct AsyncLru { + runtime: RT, inner: Arc>>, label: &'static str, handle: Arc>, - // This tokio Mutex is safe only because it's stripped out of production - // builds. We shouldn't use tokio locks for prod code (see - // https://github.com/rust-lang/rust/issues/104883 for background and - // https://github.com/get-convex/convex/pull/19307 for an alternative). - #[cfg(test)] - pause_client: Option>>, } pub type SingleValueGenerator = BoxFuture<'static, anyhow::Result>; @@ -93,10 +85,9 @@ pub type ValueGenerator = BoxFuture<'static, HashMap Clone for AsyncLru { fn clone(&self) -> Self { Self { + runtime: self.runtime.clone(), inner: self.inner.clone(), label: self.label, - #[cfg(test)] - pause_client: self.pause_client.clone(), handle: self.handle.clone(), } } @@ -222,27 +213,14 @@ impl< /// concurrency - The number of values that can be concurrently generated. /// This should be set based on system values. pub fn new(rt: RT, max_size: u64, concurrency: usize, label: &'static str) -> Self { - Self::_new( - rt, - LruCache::unbounded(), - max_size, - concurrency, - label, - #[cfg(test)] - None, - ) + Self::_new(rt, LruCache::unbounded(), max_size, concurrency, label) } #[cfg(test)] #[allow(unused)] - fn new_for_tests( - rt: RT, - max_size: u64, - label: &'static str, - pause_client: Option, - ) -> Self { + fn new_for_tests(rt: RT, max_size: u64, label: &'static str) -> Self { let lru = LruCache::unbounded(); - Self::_new(rt, lru, max_size, 1, label, pause_client) + Self::_new(rt, lru, max_size, 1, label) } fn _new( @@ -251,7 +229,6 @@ impl< max_size: u64, concurrency: usize, label: &'static str, - #[cfg(test)] pause_client: Option, ) -> Self { let (tx, rx) = new_codel_queue_async(rt.clone(), 200); let inner = Inner::new(cache, max_size, label, tx); @@ -260,12 +237,10 @@ impl< Self::value_generating_worker_thread(rt.clone(), rx, inner.clone(), concurrency), ); Self { + runtime: rt.clone(), inner, label, handle: Arc::new(handle), - #[cfg(test)] - pause_client: pause_client - .map(|pause_client| Arc::new(tokio::sync::Mutex::new(pause_client))), } } @@ -389,18 +364,14 @@ impl< key: &Key, value_generator: ValueGenerator, ) -> anyhow::Result> { + let pause_client = self.runtime.pause_client(); let status = self.get_sync(key, value_generator)?; tracing::debug!("Getting key {key:?} with status {status}"); match status { Status::Ready(value) => Ok(value), Status::Waiting(rx) => Ok(Self::wait_for_value(key, rx).await?), Status::Kickoff(rx, timer) => { - #[cfg(test)] - if let Some(pause_client) = &mut self.pause_client.clone() { - let pause_client = pause_client.lock().await; - pause_client.wait(PAUSE_DURING_GENERATE_VALUE_LABEL).await; - drop(pause_client); - } + pause_client.wait(PAUSE_DURING_GENERATE_VALUE_LABEL).await; let result = Self::wait_for_value(key, rx).await?; timer.finish(); Ok(result) @@ -692,9 +663,9 @@ mod tests { #[convex_macro::test_runtime] async fn get_when_canceled_during_calculate_returns_value( rt: TestRuntime, + pause: PauseController, ) -> anyhow::Result<()> { - let (pause, pause_client) = PauseController::new(); - let cache = AsyncLru::new_for_tests(rt, 1, "label", Some(pause_client)); + let cache = AsyncLru::new_for_tests(rt, 1, "label"); let hold_guard = pause.hold(PAUSE_DURING_GENERATE_VALUE_LABEL); let mut first = cache .get("key", GenerateRandomValue::generate_value("key").boxed()) @@ -730,13 +701,13 @@ mod tests { #[convex_macro::test_runtime] async fn size_is_zero_initially(rt: TestRuntime) { - let cache: AsyncLru = AsyncLru::new_for_tests(rt, 1, "label", None); + let cache: AsyncLru = AsyncLru::new_for_tests(rt, 1, "label"); assert_eq!(0, cache.size()); } #[convex_macro::test_runtime] async fn size_increases_on_put(rt: TestRuntime) -> anyhow::Result<()> { - let cache = AsyncLru::new_for_tests(rt, 2, "label", None); + let cache = AsyncLru::new_for_tests(rt, 2, "label"); cache .get("key1", GenerateRandomValue::generate_value("key1").boxed()) .await?; @@ -750,7 +721,7 @@ mod tests { #[convex_macro::test_runtime] async fn size_with_custom_size_increases_on_put(rt: TestRuntime) -> anyhow::Result<()> { - let cache = AsyncLru::new_for_tests(rt, 4, "label", None); + let cache = AsyncLru::new_for_tests(rt, 4, "label"); cache .get("key1", GenerateSizeTwoValue::generate_value("key1").boxed()) .await?; @@ -764,7 +735,7 @@ mod tests { #[convex_macro::test_runtime] async fn size_does_not_increase_on_get(rt: TestRuntime) -> anyhow::Result<()> { - let cache = AsyncLru::new_for_tests(rt, 1, "label", None); + let cache = AsyncLru::new_for_tests(rt, 1, "label"); cache .get("key", GenerateRandomValue::generate_value("key").boxed()) .await?; @@ -777,7 +748,7 @@ mod tests { #[convex_macro::test_runtime] async fn size_with_custom_size_does_not_increase_on_get(rt: TestRuntime) -> anyhow::Result<()> { - let cache = AsyncLru::new_for_tests(rt, 2, "label", None); + let cache = AsyncLru::new_for_tests(rt, 2, "label"); cache .get("key", GenerateSizeTwoValue::generate_value("key").boxed()) .await?; @@ -790,7 +761,7 @@ mod tests { #[convex_macro::test_runtime] async fn size_when_value_size_changes_is_consistent(rt: TestRuntime) -> anyhow::Result<()> { - let cache = AsyncLru::new_for_tests(rt, 1, "label", None); + let cache = AsyncLru::new_for_tests(rt, 1, "label"); let value = cache .get( "key", diff --git a/crates/common/src/runtime/mod.rs b/crates/common/src/runtime/mod.rs index fbf965edb..162ddb9bd 100644 --- a/crates/common/src/runtime/mod.rs +++ b/crates/common/src/runtime/mod.rs @@ -67,6 +67,7 @@ use value::heap_size::HeapSize; use crate::{ errors::recapture_stacktrace, is_canceled::IsCanceled, + pause::PauseClient, types::Timestamp, }; @@ -234,6 +235,8 @@ pub trait Runtime: Clone + Sync + Send + 'static { fn generate_timestamp(&self) -> anyhow::Result { Timestamp::try_from(self.system_time()) } + + fn pause_client(&self) -> PauseClient; } /// Abstraction over a unix timestamp. Internally it stores a Duration since the diff --git a/crates/common/src/runtime/testing/mod.rs b/crates/common/src/runtime/testing/mod.rs index 0de2a5569..7bdcaa098 100644 --- a/crates/common/src/runtime/testing/mod.rs +++ b/crates/common/src/runtime/testing/mod.rs @@ -43,6 +43,7 @@ use super::{ Runtime, SpawnHandle, }; +use crate::pause::PauseClient; pub static CONVEX_EPOCH: LazyLock = LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(1620198000)); // May 5th, 2021 :) @@ -50,6 +51,7 @@ pub static CONVEX_EPOCH: LazyLock = pub struct TestDriver { tokio_runtime: Option, state: Arc>, + pause_client: PauseClient, } impl TestDriver { @@ -58,6 +60,14 @@ impl TestDriver { } pub fn new_with_seed(seed: u64) -> Self { + Self::new_with_config(seed, PauseClient::new()) + } + + pub fn new_with_pause_client(pause_client: PauseClient) -> Self { + Self::new_with_config(0, pause_client) + } + + pub fn new_with_config(seed: u64, pause_client: PauseClient) -> Self { let tokio_seed = RngSeed::from_bytes(&seed.to_le_bytes()); let tokio_runtime = Builder::new_current_thread() .enable_time() @@ -74,6 +84,7 @@ impl TestDriver { Self { tokio_runtime: Some(tokio_runtime), state: Arc::new(Mutex::new(TestRuntimeState { rng, creation_time })), + pause_client, } } @@ -86,6 +97,7 @@ impl TestDriver { .handle() .clone(), state: Arc::downgrade(&self.state), + pause_client: self.pause_client.clone(), } } @@ -116,6 +128,7 @@ struct TestRuntimeState { pub struct TestRuntime { tokio_handle: tokio::runtime::Handle, state: Weak>, + pause_client: PauseClient, } impl TestRuntime { @@ -177,6 +190,10 @@ impl Runtime for TestRuntime { fn rng(&self) -> Box { Box::new(TestRng { rt: self.clone() }) } + + fn pause_client(&self) -> PauseClient { + self.pause_client.clone() + } } struct TestRng { diff --git a/crates/convex_macro/src/lib.rs b/crates/convex_macro/src/lib.rs index a4f025e9d..357539255 100644 --- a/crates/convex_macro/src/lib.rs +++ b/crates/convex_macro/src/lib.rs @@ -83,6 +83,33 @@ pub fn test_runtime(_attr: TokenStream, item: TokenStream) -> TokenStream { let Some(FnArg::Typed(_)) = args.first() else { panic!("#[test_runtime] requires `{name}` to have `rt: TestRuntime` as the first arg"); }; + let is_pauseable = if let Some(arg1) = args.get(1) { + assert!( + matches!(arg1, FnArg::Typed(pat) if matches!(&*pat.ty, syn::Type::Path(p) if p.path.is_ident("PauseController"))) + ); + true + } else { + false + }; + let run_test = if is_pauseable { + quote! { + let (__pause_controller, __pause_client) = ::common::pause::PauseController::new(); + let mut __test_driver = ::runtime::testing::TestDriver::new_with_pause_client( + __pause_client + ); + let rt = __test_driver.rt(); + let test_future = #name(rt, __pause_controller); + __test_driver.run_until(test_future) + } + } else { + quote! { + let mut __test_driver = ::runtime::testing::TestDriver::new(); + let rt = __test_driver.rt(); + let test_future = #name(rt); + __test_driver.run_until(test_future) + } + }; + let attrs = ast.attrs.iter(); let gen = quote! { #[test] @@ -94,10 +121,7 @@ pub fn test_runtime(_attr: TokenStream, item: TokenStream) -> TokenStream { *::common::knobs::RUNTIME_STACK_SIZE); let handler = builder .spawn(|| { - let mut __test_driver = ::runtime::testing::TestDriver::new(); - let rt = __test_driver.rt(); - let test_future = #name(rt); - __test_driver.run_until(test_future) + #run_test }) .unwrap(); handler.join().unwrap() diff --git a/crates/database/src/committer.rs b/crates/database/src/committer.rs index e6d14152b..f6387170f 100644 --- a/crates/database/src/committer.rs +++ b/crates/database/src/committer.rs @@ -493,7 +493,8 @@ impl Committer { // checkpointed a TableSummarySnapshot. // Walk any changes since the last checkpoint, and update the snapshot manager // with the new TableSummarySnapshot. - let bootstrap_result = table_summary::bootstrap::( + let bootstrap_result = table_summary::bootstrap( + self.runtime.clone(), self.persistence.reader(), self.retention_validator.clone(), latest_ts, diff --git a/crates/database/src/database.rs b/crates/database/src/database.rs index d29723899..b5e822f4b 100644 --- a/crates/database/src/database.rs +++ b/crates/database/src/database.rs @@ -55,7 +55,6 @@ use common::{ }, interval::Interval, knobs::DEFAULT_DOCUMENTS_PAGE_SIZE, - pause::PauseClient, persistence::{ new_idle_repeatable_ts, ConflictStrategy, @@ -286,7 +285,8 @@ struct ListSnapshotTableIteratorCacheEntry { } #[derive(Clone)] -pub struct DatabaseSnapshot { +pub struct DatabaseSnapshot { + runtime: RT, ts: RepeatableTimestamp, pub bootstrap_metadata: BootstrapMetadata, pub snapshot: Snapshot, @@ -339,7 +339,7 @@ pub struct BootstrapMetadata { pub index_tablet_id: TabletId, } -impl DatabaseSnapshot { +impl DatabaseSnapshot { pub async fn max_ts(reader: &dyn PersistenceReader) -> anyhow::Result { reader .max_ts() @@ -469,13 +469,13 @@ impl DatabaseSnapshot { Ok(table_registry) } - pub fn table_iterator(&self) -> TableIterator { + pub fn table_iterator(&self) -> TableIterator { TableIterator::new( + self.runtime.clone(), self.timestamp(), self.persistence_reader.clone(), self.retention_validator.clone(), 1000, - None, ) } @@ -544,7 +544,8 @@ impl DatabaseSnapshot { } #[minitrace::trace] - pub async fn load( + pub async fn load( + runtime: RT, persistence: Arc, snapshot: RepeatableTimestamp, retention_validator: Arc, @@ -620,6 +621,7 @@ impl DatabaseSnapshot { .await?; let component_registry = ComponentRegistry::bootstrap(&table_mapping, component_docs)?; Ok(Self { + runtime, ts: persistence_snapshot.timestamp(), bootstrap_metadata, snapshot: Snapshot { @@ -647,9 +649,10 @@ impl DatabaseSnapshot { /// But for tools like `db-info` or `db-verifier`, we want the table /// summaries to be loaded (and can't rely on TableSummaryWorker + /// committer in these services). - pub async fn load_table_summaries(&mut self) -> anyhow::Result<()> { + pub async fn load_table_summaries(&mut self) -> anyhow::Result<()> { tracing::info!("Bootstrapping table summaries..."); - let (table_summary_snapshot, summaries_num_rows) = table_summary::bootstrap::( + let (table_summary_snapshot, summaries_num_rows) = table_summary::bootstrap( + self.runtime.clone(), self.persistence_reader.clone(), self.retention_validator.clone(), self.ts, @@ -810,7 +813,7 @@ impl Database { // Get the latest timestamp to perform the load at. let snapshot_ts = new_idle_repeatable_ts(persistence.as_ref(), &runtime).await?; - let original_max_ts = DatabaseSnapshot::max_ts(&*reader).await?; + let original_max_ts = DatabaseSnapshot::::max_ts(&*reader).await?; let follower_retention_manager = FollowerRetentionManager::new_with_repeatable_ts( runtime.clone(), @@ -819,19 +822,21 @@ impl Database { ) .await?; - let db_snapshot = DatabaseSnapshot::load::( + let db_snapshot = DatabaseSnapshot::load( + runtime.clone(), reader.clone(), snapshot_ts, Arc::new(follower_retention_manager.clone()), ) .await?; - let max_ts = DatabaseSnapshot::max_ts(&*reader).await?; + let max_ts = DatabaseSnapshot::::max_ts(&*reader).await?; anyhow::ensure!( original_max_ts == max_ts, "race while loading DatabaseSnapshot: max ts {original_max_ts} at start, {max_ts} at \ end", ); let DatabaseSnapshot { + runtime: _, bootstrap_metadata, persistence_snapshot: _, ts, @@ -902,11 +907,8 @@ impl Database { tracing::info!("Set search storage to {search_storage:?}"); } - pub fn start_search_and_vector_bootstrap( - &self, - pause_client: PauseClient, - ) -> Box { - let worker = self.new_search_and_vector_bootstrap_worker(pause_client); + pub fn start_search_and_vector_bootstrap(&self) -> Box { + let worker = self.new_search_and_vector_bootstrap_worker(); self.runtime .spawn("search_and_vector_bootstrap", async move { worker.start().await @@ -921,13 +923,10 @@ impl Database { pub fn new_search_and_vector_bootstrap_worker_for_testing( &self, ) -> SearchIndexBootstrapWorker { - self.new_search_and_vector_bootstrap_worker(PauseClient::new()) + self.new_search_and_vector_bootstrap_worker() } - fn new_search_and_vector_bootstrap_worker( - &self, - pause_client: PauseClient, - ) -> SearchIndexBootstrapWorker { + fn new_search_and_vector_bootstrap_worker(&self) -> SearchIndexBootstrapWorker { let (ts, snapshot) = self.snapshot_manager.lock().latest(); let vector_persistence = RepeatablePersistence::new(self.reader.clone(), ts, self.retention_validator()); @@ -938,7 +937,6 @@ impl Database { vector_persistence, table_mapping, self.committer.clone(), - pause_client, ) } @@ -993,16 +991,15 @@ impl Database { &self, snapshot_ts: RepeatableTimestamp, page_size: usize, - pause_client: Option, - ) -> TableIterator { + ) -> TableIterator { let retention_validator = self.retention_validator(); let persistence = self.reader.clone(); TableIterator::new( + self.runtime.clone(), snapshot_ts, persistence, retention_validator, page_size, - pause_client, ) } @@ -1021,7 +1018,7 @@ impl Database { self, ts: RepeatableTimestamp, ) -> anyhow::Result { - let table_iterator = self.table_iterator(ts, 100, None); + let table_iterator = self.table_iterator(ts, 100); let (_, snapshot) = self.snapshot_manager.lock().latest(); let tables_tablet_id = snapshot .table_registry @@ -1065,7 +1062,7 @@ impl Database { self, ts: RepeatableTimestamp, ) -> anyhow::Result> { - let table_iterator = self.table_iterator(ts, 100, None); + let table_iterator = self.table_iterator(ts, 100); let (_, snapshot) = self.snapshot_manager.lock().latest(); let index_tablet_id = snapshot.index_registry.index_table(); let index_by_id = snapshot @@ -1100,7 +1097,7 @@ impl Database { self, ts: RepeatableTimestamp, ) -> anyhow::Result> { - let table_iterator = self.table_iterator(ts, 100, None); + let table_iterator = self.table_iterator(ts, 100); let (_, snapshot) = self.snapshot_manager.lock().latest(); let component_tablet_id = snapshot .table_registry @@ -1351,7 +1348,6 @@ impl Database { mut backoff: Backoff, usage: FunctionUsageTracker, is_retriable: R, - pause_client: PauseClient, write_source: impl Into, f: F, ) -> anyhow::Result<(Timestamp, T, OccRetryStats)> @@ -1366,6 +1362,7 @@ impl Database { let mut tx = self .begin_with_usage(identity.clone(), usage.clone()) .await?; + let pause_client = self.runtime.pause_client(); pause_client.wait("retry_tx_loop_start").await; let start = Instant::now(); let result = async { @@ -1413,7 +1410,6 @@ impl Database { &'a self, identity: Identity, usage: FunctionUsageTracker, - pause_client: PauseClient, write_source: impl Into, f: F, ) -> anyhow::Result<(Timestamp, T, OccRetryStats)> @@ -1429,7 +1425,6 @@ impl Database { backoff, usage, is_retriable, - pause_client, write_source, f, ) @@ -1444,7 +1439,6 @@ impl Database { &'a self, identity: Identity, usage: FunctionUsageTracker, - pause_client: PauseClient, write_source: impl Into, f: F, ) -> anyhow::Result<(Timestamp, T, OccRetryStats)> @@ -1460,7 +1454,6 @@ impl Database { backoff, usage, is_retriable, - pause_client, write_source, f, ) @@ -1837,7 +1830,7 @@ impl Database { let (_, ds) = cached.take().unwrap(); ds } else { - let table_iterator = self.table_iterator(snapshot, 100, None); + let table_iterator = self.table_iterator(snapshot, 100); table_iterator .stream_documents_in_table(tablet_id, by_id, resolved_cursor) .boxed() diff --git a/crates/database/src/index_worker.rs b/crates/database/src/index_worker.rs index ac2f5f252..5e0ae1e78 100644 --- a/crates/database/src/index_worker.rs +++ b/crates/database/src/index_worker.rs @@ -720,11 +720,11 @@ impl IndexWriter { tablet_id: TabletId, ) -> anyhow::Result<()> { let table_iterator = TableIterator::new( + self.runtime.clone(), snapshot_ts, self.reader.clone(), self.retention_validator.clone(), *INDEX_BACKFILL_CHUNK_SIZE, - None, ); let by_id = index_registry.must_get_by_id(tablet_id)?.id(); diff --git a/crates/database/src/index_workers/search_flusher.rs b/crates/database/src/index_workers/search_flusher.rs index 4530ca544..18aea18b1 100644 --- a/crates/database/src/index_workers/search_flusher.rs +++ b/crates/database/src/index_workers/search_flusher.rs @@ -13,8 +13,6 @@ use std::{ }; use anyhow::Context; -#[cfg(any(test, feature = "testing"))] -use common::pause::PauseClient; use common::{ knobs::{ DATABASE_WORKERS_MAX_CHECKPOINT_AGE, @@ -87,15 +85,12 @@ use crate::{ Token, }; -#[cfg(any(test, feature = "testing"))] pub(crate) const FLUSH_RUNNING_LABEL: &str = "flush_running"; pub struct SearchFlusher { params: Params, writer: SearchIndexMetadataWriter, _config: PhantomData, - #[cfg(any(test, feature = "testing"))] - pause_client: Option, } impl Deref for SearchFlusher { @@ -145,7 +140,6 @@ impl SearchFlusher { limits: SearchIndexLimits, writer: SearchIndexMetadataWriter, build_args: T::BuildIndexArgs, - #[cfg(any(test, feature = "testing"))] pause_client: Option, ) -> Self { Self { params: Params { @@ -158,8 +152,6 @@ impl SearchFlusher { }, writer, _config: PhantomData, - #[cfg(any(test, feature = "testing"))] - pause_client, } } @@ -184,10 +176,8 @@ impl SearchFlusher { tracing::info!("{num_to_build} {index_type} indexes to build"); } - #[cfg(any(test, feature = "testing"))] - if let Some(pause_client) = &mut self.pause_client { - pause_client.wait(FLUSH_RUNNING_LABEL).await; - } + let pause_client = self.database.runtime().pause_client(); + pause_client.wait(FLUSH_RUNNING_LABEL).await; for job in to_build { task::consume_budget().await; @@ -524,7 +514,7 @@ impl SearchFlusher { } => { let documents = params .database - .table_iterator(backfill_snapshot_ts, *VECTOR_INDEX_WORKER_PAGE_SIZE, None) + .table_iterator(backfill_snapshot_ts, *VECTOR_INDEX_WORKER_PAGE_SIZE) .stream_documents_in_table(*index_name.table(), by_id, cursor) .boxed() .scan(0_u64, |total_size, res| { diff --git a/crates/database/src/search_index_bootstrap.rs b/crates/database/src/search_index_bootstrap.rs index d93380151..db786ca47 100644 --- a/crates/database/src/search_index_bootstrap.rs +++ b/crates/database/src/search_index_bootstrap.rs @@ -24,7 +24,6 @@ use common::{ document::ParsedDocument, errors::report_error, knobs::UDF_EXECUTOR_OCC_MAX_RETRIES, - pause::PauseClient, persistence::{ RepeatablePersistence, TimestampRange, @@ -94,7 +93,6 @@ pub struct SearchIndexBootstrapWorker { table_mapping: NamespacedTableMapping, committer_client: CommitterClient, backoff: Backoff, - pause_client: PauseClient, } const INITIAL_BACKOFF: Duration = Duration::from_millis(10); @@ -455,7 +453,6 @@ impl SearchIndexBootstrapWorker { persistence: RepeatablePersistence, table_mapping: NamespacedTableMapping, committer_client: CommitterClient, - pause_client: PauseClient, ) -> Self { Self { runtime, @@ -464,7 +461,6 @@ impl SearchIndexBootstrapWorker { persistence, committer_client, backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF), - pause_client, } } @@ -501,7 +497,8 @@ impl SearchIndexBootstrapWorker { async fn run(&mut self) -> anyhow::Result<()> { let bootstrapped_indexes = self.bootstrap().await?; - self.pause_client.wait(FINISHED_BOOTSTRAP_UPDATES).await; + let pause_client = self.runtime.pause_client(); + pause_client.wait(FINISHED_BOOTSTRAP_UPDATES).await; self.committer_client .finish_search_and_vector_bootstrap( bootstrapped_indexes, diff --git a/crates/database/src/table_iteration.rs b/crates/database/src/table_iteration.rs index 2c2581301..53511f7b4 100644 --- a/crates/database/src/table_iteration.rs +++ b/crates/database/src/table_iteration.rs @@ -18,7 +18,6 @@ use common::{ }, interval::Interval, knobs::DOCUMENTS_IN_MEMORY, - pause::PauseClient, persistence::{ new_static_repeatable_recent, PersistenceReader, @@ -30,6 +29,7 @@ use common::{ CursorPosition, Order, }, + runtime::Runtime, try_chunks::TryChunksExt, types::{ IndexId, @@ -88,28 +88,27 @@ fn cursor_has_walked(cursor: Option<&CursorPosition>, key: &IndexKeyBytes) -> bo } } -pub struct TableIterator { +pub struct TableIterator { + runtime: RT, persistence: Arc, retention_validator: Arc, page_size: usize, - pause_client: PauseClient, snapshot_ts: RepeatableTimestamp, } -impl TableIterator { +impl TableIterator { pub fn new( + runtime: RT, snapshot_ts: RepeatableTimestamp, persistence: Arc, retention_validator: Arc, page_size: usize, - pause_client: Option, ) -> Self { - let pause_client = pause_client.unwrap_or_default(); Self { + runtime, persistence, retention_validator, page_size, - pause_client, snapshot_ts, } } @@ -171,7 +170,8 @@ impl TableIterator { let mut skipped_keys = IterationDocuments::default(); loop { - self.pause_client.wait("before_index_page").await; + let pause_client = self.runtime.pause_client(); + pause_client.wait("before_index_page").await; let page_start = cursor.index_key.clone(); let (page, new_end_ts) = self.fetch_page(index_id, tablet_id, &mut cursor).await?; anyhow::ensure!(*new_end_ts >= end_ts); @@ -638,7 +638,7 @@ mod tests { .enabled_index_metadata(TableNamespace::test_user(), &by_id)? .unwrap(); database.commit(tx).await?; - let iterator = database.table_iterator(database.now_ts_for_reads(), 2, None); + let iterator = database.table_iterator(database.now_ts_for_reads(), 2); let tablet_id = table_mapping.id(&table_name)?.tablet_id; let revision_stream = iterator.stream_documents_in_table( tablet_id, @@ -662,6 +662,7 @@ mod tests { table_name: TableName, initial: Vec, update_batches: Vec>, + pause: PauseController, ) -> anyhow::Result<()> { let database = new_test_database(runtime.clone()).await; let mut objects = BTreeMap::new(); @@ -683,10 +684,9 @@ mod tests { .unwrap(); database.commit(tx).await?; - let (pause, pause_client) = PauseController::new(); let hold_guard = pause.hold("before_index_page"); let snapshot_ts = database.now_ts_for_reads(); - let iterator = database.table_iterator(snapshot_ts, 2, Some(pause_client)); + let iterator = database.table_iterator(snapshot_ts, 2); let tablet_id = table_mapping.id(&table_name)?.tablet_id; let revision_stream = iterator.stream_documents_in_table(tablet_id, by_id_metadata.id().internal_id(), None); @@ -776,12 +776,13 @@ mod tests { } #[convex_macro::test_runtime] - async fn test_deleted(rt: TestRuntime) -> anyhow::Result<()> { + async fn test_deleted(rt: TestRuntime, pause: PauseController) -> anyhow::Result<()> { racing_commits_test( rt, "A".parse()?, vec![assert_obj!()], vec![vec![Update::Delete { index: 0 }]], + pause, ) .await } @@ -842,7 +843,7 @@ mod tests { database.commit(tx).await?; database.bump_max_repeatable_ts().await?; - let iterator = database.table_iterator(snapshot_ts, 1, None); + let iterator = database.table_iterator(snapshot_ts, 1); let tablet_id = table_mapping.id(&table_name)?.tablet_id; let revisions: Vec<_> = iterator .stream_documents_in_table_by_index(tablet_id, by_k_id, index_fields, None) @@ -878,9 +879,10 @@ mod tests { initial in small_user_objects(), update_batches in racing_updates(), ) { - let td = TestDriver::new(); + let (pause, pause_client) = PauseController::new(); + let td = TestDriver::new_with_pause_client(pause_client); td.run_until( - racing_commits_test(td.rt(), table_name, initial, update_batches), + racing_commits_test(td.rt(), table_name, initial, update_batches, pause), ).unwrap(); } diff --git a/crates/database/src/table_summary.rs b/crates/database/src/table_summary.rs index 45b61bfbf..1a1954938 100644 --- a/crates/database/src/table_summary.rs +++ b/crates/database/src/table_summary.rs @@ -5,8 +5,6 @@ use std::{ sync::Arc, }; -#[cfg(any(test, feature = "testing"))] -use common::pause::PauseClient; use common::{ document::ResolvedDocument, persistence::{ @@ -304,11 +302,7 @@ impl TableSummaryWriter { } #[cfg(any(test, feature = "testing"))] - pub async fn compute_snapshot( - &self, - pause_client: Option, - page_size: usize, - ) -> anyhow::Result { + pub async fn compute_snapshot(&self, page_size: usize) -> anyhow::Result { let mut tx = self.database.begin(Identity::system()).await?; let start_ts = tx.begin_timestamp(); let table_mapping = tx.table_mapping().clone(); @@ -317,12 +311,12 @@ impl TableSummaryWriter { let snapshot_ts = self.database.now_ts_for_reads(); - let pause_client = pause_client.unwrap_or_default(); + let pause_client = self.database.runtime().pause_client(); pause_client.wait("table_summary_snapshot_picked").await; let database = self.database.clone(); Self::collect_snapshot( *start_ts, - move || database.table_iterator(snapshot_ts, page_size, None), + move || database.table_iterator(snapshot_ts, page_size), &table_mapping, &by_id_indexes, ) @@ -333,7 +327,7 @@ impl TableSummaryWriter { // table_iterator, table_mapping, and by_id_indexes should all be // computed at the same snapshot. snapshot_ts: Timestamp, - table_iterator: impl Fn() -> TableIterator, + table_iterator: impl Fn() -> TableIterator, table_mapping: &TableMapping, by_id_indexes: &BTreeMap, ) -> anyhow::Result { @@ -379,7 +373,8 @@ impl TableSummaryWriter { async fn compute(&self, bootstrap_kind: BootstrapKind) -> anyhow::Result { let reader = self.persistence.reader(); let upper_bound = self.database.now_ts_for_reads(); - let (new_snapshot, _) = bootstrap::( + let (new_snapshot, _) = bootstrap( + self.database.runtime().clone(), reader, self.retention_validator.clone(), upper_bound, @@ -421,6 +416,7 @@ pub fn table_summary_bootstrapping_error(msg: Option<&'static str>) -> anyhow::E /// * The new table summary snapshot /// * The number of log entries processed pub async fn bootstrap( + runtime: RT, persistence: Arc, retention_validator: Arc, target_ts: RepeatableTimestamp, @@ -432,11 +428,16 @@ pub async fn bootstrap( BootstrapKind::FromCheckpoint => TableSummarySnapshot::load(persistence.as_ref()).await?, }; let recent_ts = new_static_repeatable_recent(persistence.as_ref()).await?; - let (table_mapping, _, index_registry, ..) = DatabaseSnapshot::load_table_and_index_metadata( - &RepeatablePersistence::new(persistence.clone(), recent_ts, retention_validator.clone()) + let (table_mapping, _, index_registry, ..) = + DatabaseSnapshot::::load_table_and_index_metadata( + &RepeatablePersistence::new( + persistence.clone(), + recent_ts, + retention_validator.clone(), + ) .read_snapshot(recent_ts)?, - ) - .await?; + ) + .await?; let (base_snapshot, base_snapshot_ts) = match stored_snapshot { Some(base) => base, None => { @@ -445,11 +446,11 @@ pub async fn bootstrap( *recent_ts, || { TableIterator::new( + runtime.clone(), recent_ts, persistence.clone(), retention_validator.clone(), 1000, - None, ) }, &table_mapping, @@ -656,6 +657,7 @@ mod tests { // Bootstrap at ts2 by walking by_id, and write the snapshot that later // test cases will use. let (snapshot, _) = bootstrap::( + rt.clone(), persistence.reader(), rv.clone(), ts2, @@ -670,7 +672,8 @@ mod tests { write_snapshot(persistence.as_ref(), &snapshot).await?; // Bootstrap at ts2 by reading the snapshot and returning it. - let (snapshot, walked) = bootstrap::( + let (snapshot, walked) = bootstrap( + rt.clone(), persistence.reader(), rv.clone(), ts2, @@ -685,7 +688,8 @@ mod tests { assert_eq!(snapshot.ts, *ts2); // Bootstrap at ts3 by reading the snapshot and walking forwards. - let (snapshot, walked) = bootstrap::( + let (snapshot, walked) = bootstrap( + rt.clone(), persistence.reader(), rv.clone(), ts3, @@ -700,7 +704,8 @@ mod tests { assert_eq!(snapshot.ts, *ts3); // Bootstrap at ts1 by reading the snapshot and walking backwards. - let (snapshot, walked) = bootstrap::( + let (snapshot, walked) = bootstrap( + rt.clone(), persistence.reader(), rv.clone(), ts1, @@ -715,7 +720,8 @@ mod tests { assert_eq!(snapshot.ts, *ts1); // Bootstrap from scratch at ts3 by walking by_id. - let (snapshot, _) = bootstrap::( + let (snapshot, _) = bootstrap( + rt.clone(), persistence.reader(), rv.clone(), ts3, @@ -778,7 +784,7 @@ mod tests { database, Arc::new(NoopRetentionValidator), ); - let computed = writer.compute_snapshot(None, 2).await?; + let computed = writer.compute_snapshot(2).await?; if !is_empty { let table_id = table_mapping @@ -827,7 +833,7 @@ mod tests { database, Arc::new(NoopRetentionValidator), ); - let computed = writer.compute_snapshot(None, 2).await?; + let computed = writer.compute_snapshot(2).await?; for (table_name, values) in &values { if !values.is_empty() { diff --git a/crates/database/src/test_helpers/db_fixtures.rs b/crates/database/src/test_helpers/db_fixtures.rs index 68daef4d8..b69b5d1a3 100644 --- a/crates/database/src/test_helpers/db_fixtures.rs +++ b/crates/database/src/test_helpers/db_fixtures.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use common::{ - pause::PauseClient, persistence::Persistence, runtime::Runtime, testing::TestPersistence, @@ -93,7 +92,7 @@ impl DbFixtures { .await?; db.set_search_storage(search_storage.clone()); if bootstrap_search_and_vector_indexes { - let mut handle = db.start_search_and_vector_bootstrap(PauseClient::new()); + let mut handle = db.start_search_and_vector_bootstrap(); handle.join().await?; } let build_index_args = BuildTextIndexArgs { diff --git a/crates/database/src/tests/mod.rs b/crates/database/src/tests/mod.rs index 088e820dc..e72cea0a7 100644 --- a/crates/database/src/tests/mod.rs +++ b/crates/database/src/tests/mod.rs @@ -29,7 +29,6 @@ use common::{ }, maybe_val, object_validator, - pause::PauseClient, persistence::{ NoopRetentionValidator, Persistence, @@ -1913,7 +1912,8 @@ async fn test_index_write(rt: TestRuntime) -> anyhow::Result<()> { retention_validator.clone(), rt.clone(), ); - let database_snapshot = DatabaseSnapshot::load::( + let database_snapshot = DatabaseSnapshot::load( + rt.clone(), tp.reader(), unchecked_repeatable_ts(ts), retention_validator, @@ -2077,7 +2077,6 @@ async fn test_retries(rt: TestRuntime) -> anyhow::Result<()> { db.execute_with_occ_retries( Identity::system(), FunctionUsageTracker::new(), - PauseClient::new(), WriteSource::unknown(), |tx| insert(tx).into(), ) @@ -2121,7 +2120,6 @@ async fn test_retries_includes_f(rt: TestRuntime) -> anyhow::Result<()> { Backoff::new(Duration::from_secs(0), Duration::from_millis(10)), FunctionUsageTracker::new(), |e: &anyhow::Error| e.is_overloaded(), - PauseClient::new(), WriteSource::unknown(), |tx| overloaded(tx, receiver.clone()).into(), ) @@ -2139,7 +2137,6 @@ async fn test_retries_includes_f(rt: TestRuntime) -> anyhow::Result<()> { Backoff::new(Duration::from_secs(0), Duration::from_millis(10)), FunctionUsageTracker::new(), |e: &anyhow::Error| e.is_overloaded(), - PauseClient::new(), WriteSource::unknown(), |tx| overloaded(tx, receiver.clone()).into(), ) diff --git a/crates/database/src/tests/randomized_search_tests.rs b/crates/database/src/tests/randomized_search_tests.rs index 876ad2e40..6d6f261f5 100644 --- a/crates/database/src/tests/randomized_search_tests.rs +++ b/crates/database/src/tests/randomized_search_tests.rs @@ -1049,14 +1049,14 @@ async fn search_fails_while_bootstrapping(rt: TestRuntime) -> anyhow::Result<()> /// Test that search works after bootstrapping has finished when there are /// writes in between bootstrap ts and the commit ts. #[convex_macro::test_runtime] -async fn search_works_after_bootstrapping(rt: TestRuntime) -> anyhow::Result<()> { +async fn search_works_after_bootstrapping( + rt: TestRuntime, + pause_controller: PauseController, +) -> anyhow::Result<()> { let scenario = Scenario::new(rt.clone()).await?; - let (pause_controller, pause_client) = PauseController::new(); let hold_guard = pause_controller.hold(FINISHED_BOOTSTRAP_UPDATES); let mut wait_for_blocked = hold_guard.wait_for_blocked().boxed(); - let mut handle = scenario - .database - .start_search_and_vector_bootstrap(pause_client); + let mut handle = scenario.database.start_search_and_vector_bootstrap(); let bootstrap_fut = handle.join().fuse(); pin_mut!(bootstrap_fut); select_biased! { diff --git a/crates/database/src/tests/vector_test_utils.rs b/crates/database/src/tests/vector_test_utils.rs index 384cc16cc..decc4ca9c 100644 --- a/crates/database/src/tests/vector_test_utils.rs +++ b/crates/database/src/tests/vector_test_utils.rs @@ -262,8 +262,7 @@ impl VectorFixtures { self.new_index_flusher_with_full_scan_threshold(*MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB) } - pub async fn run_compaction_during_flush(&self) -> anyhow::Result<()> { - let (pause, pause_client) = PauseController::new(); + pub async fn run_compaction_during_flush(&self, pause: PauseController) -> anyhow::Result<()> { let mut flusher = new_vector_flusher_for_tests( self.rt.clone(), self.db.clone(), @@ -273,7 +272,6 @@ impl VectorFixtures { 0, *MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, 8, - Some(pause_client), ); let hold_guard = pause.hold(FLUSH_RUNNING_LABEL); let flush = flusher.step(); @@ -302,7 +300,6 @@ impl VectorFixtures { 0, full_scan_threshold_kb, *VECTOR_INDEX_SIZE_SOFT_LIMIT, - None, )) } @@ -319,7 +316,6 @@ impl VectorFixtures { 0, *MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, incremental_part_threshold, - None, )) } diff --git a/crates/database/src/tests/vector_tests.rs b/crates/database/src/tests/vector_tests.rs index 0be9c9b6b..271dc6f1c 100644 --- a/crates/database/src/tests/vector_tests.rs +++ b/crates/database/src/tests/vector_tests.rs @@ -24,7 +24,6 @@ use common::{ MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, VECTOR_INDEX_SIZE_SOFT_LIMIT, }, - pause::PauseClient, persistence::PersistenceReader, runtime::Runtime, types::{ @@ -134,7 +133,7 @@ impl Scenario { }, ) .await?; - let mut handle = db.start_search_and_vector_bootstrap(PauseClient::new()); + let mut handle = db.start_search_and_vector_bootstrap(); handle.join().await?; let self_ = Self { @@ -777,7 +776,6 @@ async fn test_index_backfill_is_incremental(rt: TestRuntime) -> anyhow::Result<( *VECTOR_INDEX_SIZE_SOFT_LIMIT, *MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, incremental_index_size, - None, ); let mut backfill_ts = None; @@ -862,7 +860,6 @@ async fn test_incremental_backfill_with_compaction(rt: TestRuntime) -> anyhow::R *VECTOR_INDEX_SIZE_SOFT_LIMIT, *MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, incremental_index_size, - None, ); for _ in 0..num_parts { diff --git a/crates/database/src/text_index_worker/flusher.rs b/crates/database/src/text_index_worker/flusher.rs index b71d567ad..97295d249 100644 --- a/crates/database/src/text_index_worker/flusher.rs +++ b/crates/database/src/text_index_worker/flusher.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -#[cfg(any(test, feature = "testing"))] -use common::pause::PauseClient; use common::{ knobs::SEARCH_INDEX_SIZE_SOFT_LIMIT, persistence::PersistenceReader, @@ -65,8 +63,6 @@ pub(crate) struct FlusherBuilder { segment_term_metadata_fetcher: Arc, limits: SearchIndexLimits, writer: SearchIndexMetadataWriter, - #[cfg(any(test, feature = "testing"))] - pause_client: Option, } impl FlusherBuilder { @@ -89,8 +85,6 @@ impl FlusherBuilder { index_size_soft_limit: *SEARCH_INDEX_SIZE_SOFT_LIMIT, incremental_multipart_threshold_bytes: *SEARCH_INDEX_SIZE_SOFT_LIMIT, }, - #[cfg(any(test, feature = "testing"))] - pause_client: None, } } @@ -129,8 +123,6 @@ impl FlusherBuilder { search_storage: self.storage.clone(), segment_term_metadata_fetcher: self.segment_term_metadata_fetcher.clone(), }, - #[cfg(any(test, feature = "testing"))] - self.pause_client, ) } } diff --git a/crates/database/src/vector_index_worker/flusher.rs b/crates/database/src/vector_index_worker/flusher.rs index 9a20de32f..a0b7bc97d 100644 --- a/crates/database/src/vector_index_worker/flusher.rs +++ b/crates/database/src/vector_index_worker/flusher.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -#[cfg(any(test, feature = "testing"))] -use common::pause::PauseClient; use common::{ knobs::{ MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, @@ -43,7 +41,6 @@ pub async fn backfill_vector_indexes( /* index_size_soft_limit= */ 0, *MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, *VECTOR_INDEX_SIZE_SOFT_LIMIT, - None, ); flusher.step().await?; Ok(()) @@ -59,7 +56,6 @@ pub(crate) fn new_vector_flusher_for_tests( index_size_soft_limit: usize, full_scan_segment_max_kb: usize, incremental_multipart_threshold_bytes: usize, - pause_client: Option, ) -> VectorIndexFlusher { use search::metrics::SearchType; let writer = SearchIndexMetadataWriter::new( @@ -84,7 +80,6 @@ pub(crate) fn new_vector_flusher_for_tests( BuildVectorIndexArgs { full_scan_threshold_bytes: full_scan_segment_max_kb, }, - pause_client, ) } @@ -108,8 +103,6 @@ pub(crate) fn new_vector_flusher( BuildVectorIndexArgs { full_scan_threshold_bytes: *MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, }, - #[cfg(any(test, feature = "testing"))] - None, ) } @@ -132,6 +125,7 @@ mod tests { MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, VECTOR_INDEX_SIZE_SOFT_LIMIT, }, + pause::PauseController, persistence::PersistenceReader, runtime::Runtime, types::{ @@ -196,7 +190,6 @@ mod tests { soft_limit, *MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB, *VECTOR_INDEX_SIZE_SOFT_LIMIT, - None, )) } @@ -435,7 +428,10 @@ mod tests { } #[convex_macro::test_runtime] - async fn backfilled_concurrent_compaction_and_flush(rt: TestRuntime) -> anyhow::Result<()> { + async fn backfilled_concurrent_compaction_and_flush( + rt: TestRuntime, + pause: PauseController, + ) -> anyhow::Result<()> { let config = CompactionConfig::default(); let min_compaction_segments = config.min_compaction_segments; let config = CompactionConfig { @@ -475,7 +471,7 @@ mod tests { // Run the compactor / flusher concurrently in a way where the compactor // wins the race. - fixtures.run_compaction_during_flush().await?; + fixtures.run_compaction_during_flush(pause).await?; // Verify we propagate the new deletes to the compacted segment and retain our // new segment. @@ -500,6 +496,7 @@ mod tests { #[convex_macro::test_runtime] async fn incremental_index_backfill_concurrent_compaction_and_flush( rt: TestRuntime, + pause: PauseController, ) -> anyhow::Result<()> { let config = CompactionConfig::default(); let min_compaction_segments = config.min_compaction_segments; @@ -525,7 +522,7 @@ mod tests { // For last iteration, run the compactor / flusher concurrently in a way where // the compactor wins the race. - fixtures.run_compaction_during_flush().await?; + fixtures.run_compaction_during_flush(pause).await?; // There should be 2 segments left: the compacted segment and the new segment // from flush @@ -544,6 +541,7 @@ mod tests { #[convex_macro::test_runtime] async fn concurrent_compaction_and_flush_new_segment_propagates_deletes( rt: TestRuntime, + pause: PauseController, ) -> anyhow::Result<()> { let config = CompactionConfig::default(); let min_compaction_segments = config.min_compaction_segments; @@ -582,7 +580,7 @@ mod tests { // Run the compactor / flusher concurrently in a way where the compactor // wins the race. - fixtures.run_compaction_during_flush().await?; + fixtures.run_compaction_during_flush(pause).await?; // Verify we propagate the new deletes to the compacted segment and retain our // new segment. @@ -607,6 +605,7 @@ mod tests { #[convex_macro::test_runtime] async fn concurrent_compaction_and_flush_no_new_segment_propagates_updates_and_deletes( rt: TestRuntime, + pause: PauseController, ) -> anyhow::Result<()> { let config = CompactionConfig::default(); let min_compaction_segments = config.min_compaction_segments; @@ -658,7 +657,7 @@ mod tests { } fixtures.db.commit(tx).await?; - fixtures.run_compaction_during_flush().await?; + fixtures.run_compaction_during_flush(pause).await?; let segments = fixtures.get_segments_metadata(index_name).await?; assert_eq!(1, segments.len()); diff --git a/crates/function_runner/src/in_memory_indexes.rs b/crates/function_runner/src/in_memory_indexes.rs index 13ea0fbb0..ae7cdc443 100644 --- a/crates/function_runner/src/in_memory_indexes.rs +++ b/crates/function_runner/src/in_memory_indexes.rs @@ -294,7 +294,7 @@ impl InMemoryIndexCache { ); let (index_documents, table_documents) = futures::future::try_join(index_documents_fut, table_documents_fut).await?; - let (table_mapping, table_states) = DatabaseSnapshot::table_mapping_and_states( + let (table_mapping, table_states) = DatabaseSnapshot::::table_mapping_and_states( table_documents.map(|doc| doc.try_into()).try_collect()?, ); let index_registry = IndexRegistry::bootstrap( @@ -308,7 +308,7 @@ impl InMemoryIndexCache { table_states, persistence_snapshot.persistence().version(), )?; - DatabaseSnapshot::verify_invariants(&table_registry, &index_registry)?; + DatabaseSnapshot::::verify_invariants(&table_registry, &index_registry)?; let component_tablet = table_mapping .namespace(TableNamespace::Global) .id(&COMPONENTS_TABLE)? diff --git a/crates/function_runner/src/in_process_function_runner.rs b/crates/function_runner/src/in_process_function_runner.rs index c752c4b07..74ec5560d 100644 --- a/crates/function_runner/src/in_process_function_runner.rs +++ b/crates/function_runner/src/in_process_function_runner.rs @@ -22,7 +22,6 @@ use common::{ execution_context::ExecutionContext, http::fetch::FetchClient, log_lines::LogLine, - pause::PauseClient, persistence::PersistenceReader, runtime::{ Runtime, @@ -100,8 +99,6 @@ pub struct InProcessFunctionRunner { // and ApplicationFunctionRunner. action_callbacks: Arc>>>, fetch_client: Arc, - - pause_client: PauseClient, } impl InProcessFunctionRunner { @@ -114,7 +111,6 @@ impl InProcessFunctionRunner { storage: InstanceStorage, database: Database, fetch_client: Arc, - pause_client: PauseClient, ) -> anyhow::Result { // InProcessFunrun is single tenant and thus can use the full capacity. let max_percent_per_client = 100; @@ -128,7 +124,6 @@ impl InProcessFunctionRunner { database, action_callbacks: Arc::new(RwLock::new(None)), fetch_client, - pause_client, }) } } @@ -153,7 +148,8 @@ impl FunctionRunner for InProcessFunctionRunner { FunctionOutcome, FunctionUsageStats, )> { - self.pause_client.wait("run_function").await; + let pause_client = self.database.runtime().pause_client(); + pause_client.wait("run_function").await; let snapshot = self.database.snapshot(ts)?; let table_count_snapshot = Arc::new(snapshot.table_summaries); diff --git a/crates/isolate/src/client.rs b/crates/isolate/src/client.rs index 1d1bcae51..d17743f7e 100644 --- a/crates/isolate/src/client.rs +++ b/crates/isolate/src/client.rs @@ -57,7 +57,6 @@ use common::{ initialize_root_from_parent, EncodedSpan, }, - pause::PauseClient, query_journal::QueryJournal, runtime::{ shutdown_and_join, @@ -177,7 +176,6 @@ use crate::{ // active permits more frequently than that. const ACTIVE_CONCURRENCY_PERMITS_LOG_FREQUENCY: Duration = Duration::from_secs(10); -#[cfg(any(test, feature = "testing"))] pub const PAUSE_RECREATE_CLIENT: &str = "recreate_client"; pub const PAUSE_REQUEST: &str = "pause_request"; pub const NO_AVAILABLE_WORKERS: &str = "There are no available workers to process the request"; @@ -408,7 +406,6 @@ pub struct EnvironmentData { pub struct Request { pub client_id: String, pub inner: RequestType, - pub pause_client: PauseClient, pub parent_trace: EncodedSpan, } @@ -417,7 +414,6 @@ impl Request { Self { client_id, inner, - pause_client: PauseClient::new(), parent_trace, } } @@ -1443,12 +1439,8 @@ pub trait IsolateWorker: Clone + Send + 'static { if last_client_id.get_or_insert_with(|| { req.client_id.clone() }) != &req.client_id { - #[cfg(any(test, feature = "testing"))] - if let Some(pause_client) = &mut self.pause_client() { - let pause_client = pause_client.lock().await; - pause_client.wait(PAUSE_RECREATE_CLIENT).await; - drop(pause_client); - } + let pause_client = self.rt().pause_client(); + pause_client.wait(PAUSE_RECREATE_CLIENT).await; tracing::debug!("Restarting isolate due to client change, previous: {:?}, new: {:?}", last_client_id, req.client_id); metrics::log_recreate_isolate("client_id_changed"); drop(isolate); @@ -1498,8 +1490,6 @@ pub trait IsolateWorker: Clone + Send + 'static { fn config(&self) -> &IsolateConfig; fn rt(&self) -> RT; - #[cfg(any(test, feature = "testing"))] - fn pause_client(&self) -> Option>>; } pub(crate) fn should_recreate_isolate( @@ -1564,21 +1554,23 @@ mod tests { } #[convex_macro::test_runtime] - async fn test_scheduler_workers_limit_requests(rt: TestRuntime) -> anyhow::Result<()> { + async fn test_scheduler_workers_limit_requests( + rt: TestRuntime, + pause1: PauseController, + ) -> anyhow::Result<()> { initialize_v8(); let function_runner_core = IsolateClient::new(rt.clone(), 100, 1, None)?; - let (pause1, pause_client1) = PauseController::new(); let DbFixtures { db, .. } = DbFixtures::new(&rt).await?; let client1 = "client1"; let hold_guard = pause1.hold(PAUSE_REQUEST); let (sender, _rx1) = oneshot::channel(); - let request = bogus_udf_request(&db, client1, Some(pause_client1), sender).await?; + let request = bogus_udf_request(&db, client1, sender).await?; function_runner_core.send_request(request)?; // Pausing a request while being executed should make the next request be // rejected because there are no available workers. let _guard = hold_guard.wait_for_blocked().await.unwrap(); let (sender, rx2) = oneshot::channel(); - let request2 = bogus_udf_request(&db, client1, None, sender).await?; + let request2 = bogus_udf_request(&db, client1, sender).await?; function_runner_core.send_request(request2)?; let response = IsolateClient::::receive_response(rx2).await?; let err = response.unwrap_err(); @@ -1590,43 +1582,45 @@ mod tests { #[convex_macro::test_runtime] async fn test_scheduler_does_not_throttle_different_clients( rt: TestRuntime, + pause1: PauseController, ) -> anyhow::Result<()> { initialize_v8(); let function_runner_core = IsolateClient::new(rt.clone(), 50, 2, None)?; - let (pause1, pause_client1) = PauseController::new(); let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; let client1 = "client1"; let hold_guard = pause1.hold(PAUSE_REQUEST); let (sender, _rx1) = oneshot::channel(); - let request = bogus_udf_request(&db, client1, Some(pause_client1), sender).await?; + let request = bogus_udf_request(&db, client1, sender).await?; function_runner_core.send_request(request)?; // Pausing a request should not affect the next one because we have 2 workers // and 2 requests from different clients. let _guard = hold_guard.wait_for_blocked().await.unwrap(); let (sender, rx2) = oneshot::channel(); let client2 = "client2"; - let request2 = bogus_udf_request(&db, client2, None, sender).await?; + let request2 = bogus_udf_request(&db, client2, sender).await?; function_runner_core.send_request(request2)?; IsolateClient::::receive_response(rx2).await??; Ok(()) } #[convex_macro::test_runtime] - async fn test_scheduler_throttles_same_client(rt: TestRuntime) -> anyhow::Result<()> { + async fn test_scheduler_throttles_same_client( + rt: TestRuntime, + pause1: PauseController, + ) -> anyhow::Result<()> { initialize_v8(); let function_runner_core = IsolateClient::new(rt.clone(), 50, 2, None)?; - let (pause1, pause_client1) = PauseController::new(); let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; let client = "client"; let hold_guard = pause1.hold(PAUSE_REQUEST); let (sender, _rx1) = oneshot::channel(); - let request = bogus_udf_request(&db, client, Some(pause_client1), sender).await?; + let request = bogus_udf_request(&db, client, sender).await?; function_runner_core.send_request(request)?; // Pausing the first request and sending a second should make the second fail // because there's only one worker left and it is reserved for other clients. let _guard = hold_guard.wait_for_blocked().await.unwrap(); let (sender, rx2) = oneshot::channel(); - let request2 = bogus_udf_request(&db, client, None, sender).await?; + let request2 = bogus_udf_request(&db, client, sender).await?; function_runner_core.send_request(request2)?; let response = IsolateClient::::receive_response(rx2).await?; let err = response.unwrap_err(); diff --git a/crates/isolate/src/isolate_worker.rs b/crates/isolate/src/isolate_worker.rs index a7f61b41f..faae66fad 100644 --- a/crates/isolate/src/isolate_worker.rs +++ b/crates/isolate/src/isolate_worker.rs @@ -1,9 +1,4 @@ -#[cfg(any(test, feature = "testing"))] -use std::sync::Arc; - use async_trait::async_trait; -#[cfg(any(test, feature = "testing"))] -use common::pause::PauseClient; use common::{ runtime::Runtime, sync::oneshot_receiver_closed, @@ -46,32 +41,11 @@ use crate::{ pub(crate) struct FunctionRunnerIsolateWorker { rt: RT, isolate_config: IsolateConfig, - // This tokio Mutex is safe only because it's stripped out of production - // builds. We shouldn't use tokio locks for prod code (see - // https://github.com/rust-lang/rust/issues/104883 for background and - // https://github.com/get-convex/convex/pull/19307 for an alternative). - #[cfg(any(test, feature = "testing"))] - pause_client: Option>>, } impl FunctionRunnerIsolateWorker { pub(crate) fn new(rt: RT, isolate_config: IsolateConfig) -> Self { - Self { - rt, - isolate_config, - #[cfg(any(test, feature = "testing"))] - pause_client: None, - } - } - - #[cfg(any(test, feature = "testing"))] - #[cfg_attr(not(test), expect(dead_code))] - fn new_for_tests(rt: RT, isolate_config: IsolateConfig, pause_client: PauseClient) -> Self { - Self { - rt, - isolate_config, - pause_client: Some(Arc::new(tokio::sync::Mutex::new(pause_client))), - } + Self { rt, isolate_config } } async fn handle_request_inner( @@ -81,7 +55,6 @@ impl FunctionRunnerIsolateWorker { Request { client_id, inner, - pause_client: _, parent_trace: _, }: Request, heap_stats: SharedIsolateHeapStats, @@ -337,7 +310,8 @@ impl IsolateWorker for FunctionRunnerIsolateWorker { request: Request, heap_stats: SharedIsolateHeapStats, ) -> String { - request.pause_client.wait(PAUSE_REQUEST).await; + let pause_client = self.rt.pause_client(); + pause_client.wait(PAUSE_REQUEST).await; let client_id = request.client_id.clone(); // Set the scope to be tagged with the client_id just for the duration of // handling the request. It would be nice to get sentry::with_scope to work, but @@ -361,11 +335,6 @@ impl IsolateWorker for FunctionRunnerIsolateWorker { fn rt(&self) -> RT { self.rt.clone() } - - #[cfg(any(test, feature = "testing"))] - fn pause_client(&self) -> Option>> { - self.pause_client.clone() - } } #[cfg(test)] @@ -386,24 +355,22 @@ mod tests { #[convex_macro::test_runtime] async fn test_isolate_recreated_with_client_change_function_runner( rt: TestRuntime, + pause: PauseController, ) -> anyhow::Result<()> { let isolate_config = IsolateConfig::default(); - let (pause, pause_client) = PauseController::new(); let hold_guard = pause.hold(PAUSE_RECREATE_CLIENT); - let worker = - FunctionRunnerIsolateWorker::new_for_tests(rt.clone(), isolate_config, pause_client); + let worker = FunctionRunnerIsolateWorker::new(rt.clone(), isolate_config); test_isolate_recreated_with_client_change(rt, worker, hold_guard).await } #[convex_macro::test_runtime] async fn test_isolate_not_recreated_with_same_client_function_runner( rt: TestRuntime, + pause: PauseController, ) -> anyhow::Result<()> { let isolate_config = IsolateConfig::default(); - let (pause, pause_client) = PauseController::new(); let hold_guard = pause.hold(PAUSE_RECREATE_CLIENT); - let worker = - FunctionRunnerIsolateWorker::new_for_tests(rt.clone(), isolate_config, pause_client); + let worker = FunctionRunnerIsolateWorker::new(rt.clone(), isolate_config); test_isolate_not_recreated_with_same_client(rt, worker, hold_guard).await } } diff --git a/crates/isolate/src/test_helpers.rs b/crates/isolate/src/test_helpers.rs index fb5914d90..a70e81914 100644 --- a/crates/isolate/src/test_helpers.rs +++ b/crates/isolate/src/test_helpers.rs @@ -34,10 +34,7 @@ use common::{ }, log_lines::LogLines, minitrace_helpers::EncodedSpan, - pause::{ - HoldGuard, - PauseClient, - }, + pause::HoldGuard, persistence::Persistence, query_journal::QueryJournal, runtime::{ @@ -296,7 +293,7 @@ impl UdfTest { }, ) .await?; - let mut handle = database.start_search_and_vector_bootstrap(PauseClient::new()); + let mut handle = database.start_search_and_vector_bootstrap(); handle.join().await?; let key_broker = KeyBroker::new(DEV_INSTANCE_NAME, InstanceSecret::try_from(DEV_SECRET)?)?; let modules_storage = Arc::new(LocalDirStorage::new(rt.clone())?); @@ -1445,7 +1442,6 @@ impl ActionCallbacks for UdfTest { pub async fn bogus_udf_request( db: &Database, client_id: &str, - pause_client: Option, sender: oneshot::Sender, FunctionOutcome)>>, ) -> anyhow::Result> { let tx = db.begin_system().await?; @@ -1472,7 +1468,6 @@ pub async fn bogus_udf_request( Ok(Request { client_id: client_id.to_string(), inner, - pause_client: pause_client.unwrap_or_default(), parent_trace: EncodedSpan::empty(), }) } @@ -1510,7 +1505,7 @@ pub async fn test_isolate_recreated_with_client_change PauseClient { + PauseClient::new() + } }