Skip to content

Commit

Permalink
move PauseClient into Runtime (#33309)
Browse files Browse the repository at this point in the history
PauseClient is really a global at the same level as Runtime -- based on the Runtime we might decide to advance time really quickly for tests, make randomness deterministic, start threads deterministically, or pause at specific labels to allow race conditions to be tested.

I was motivated to do this because I was adding new race tests for QueryCache, and I found myself passing PauseClient through many layers of code that already have Runtime and also already have multiple PauseClients. This felt redundant, and I can clean up a lot of code by just passing through the runtime.

One side effect is passing Runtime to a couple structs that don't already have it, like DatabaseSnapshot.

GitOrigin-RevId: f99a8625946fc2356103b5b8e294aa31fc6e0007
  • Loading branch information
ldanilek authored and Convex, Inc. committed Jan 20, 2025
1 parent ec11600 commit 4121cc0
Show file tree
Hide file tree
Showing 46 changed files with 288 additions and 465 deletions.
3 changes: 0 additions & 3 deletions crates/application/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use common::{
PublicFunctionPath,
},
http::ResolvedHostname,
pause::PauseClient,
runtime::Runtime,
types::{
AllowedVisibility,
Expand Down Expand Up @@ -355,7 +354,6 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
identity,
mutation_identifier,
caller,
PauseClient::new(),
)
.await
}
Expand All @@ -381,7 +379,6 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
identity,
mutation_identifier,
caller,
PauseClient::new(),
)
.await
}
Expand Down
10 changes: 1 addition & 9 deletions crates/application/src/application_function_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use common::{
LogLines,
},
minitrace_helpers::EncodedSpan,
pause::PauseClient,
query_journal::QueryJournal,
runtime::{
Runtime,
Expand Down Expand Up @@ -724,7 +723,6 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
identity: Identity,
mutation_identifier: Option<SessionRequestIdentifier>,
caller: FunctionCaller,
pause_client: PauseClient,
) -> anyhow::Result<Result<MutationReturn, MutationError>> {
let timer = mutation_timer();
let result = self
Expand All @@ -735,7 +733,6 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
identity,
mutation_identifier,
caller,
pause_client,
)
.await;
match &result {
Expand All @@ -755,7 +752,6 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
identity: Identity,
mutation_identifier: Option<SessionRequestIdentifier>,
caller: FunctionCaller,
pause_client: PauseClient,
) -> anyhow::Result<Result<MutationReturn, MutationError>> {
if path.is_system() && !(identity.is_admin() || identity.is_system()) {
anyhow::bail!(unauthorized_error("mutation"));
Expand Down Expand Up @@ -788,6 +784,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
.database
.begin_with_usage(identity.clone(), usage_tracker.clone())
.await?;
let pause_client = self.runtime.pause_client();
pause_client.wait("retry_mutation_loop_start").await;
let identity = tx.inert_identity();

Expand Down Expand Up @@ -1888,7 +1885,6 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
FunctionCaller::Action {
parent_scheduled_job: context.parent_scheduled_job,
},
PauseClient::new(),
)
.await
.map(|r| match r {
Expand Down Expand Up @@ -1969,7 +1965,6 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
.execute_with_occ_retries(
identity,
FunctionUsageTracker::new(),
PauseClient::new(),
"app_funrun_storage_store_file_entry",
|tx| {
async {
Expand Down Expand Up @@ -2001,7 +1996,6 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
.execute_with_occ_retries(
identity,
FunctionUsageTracker::new(),
PauseClient::new(),
"app_funrun_storage_delete",
|tx| {
async {
Expand Down Expand Up @@ -2032,7 +2026,6 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
.execute_with_occ_retries(
identity,
FunctionUsageTracker::new(),
PauseClient::new(),
"app_funrun_schedule_job",
|tx| {
let path = scheduled_path.clone();
Expand Down Expand Up @@ -2071,7 +2064,6 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
.execute_with_occ_retries(
identity,
FunctionUsageTracker::new(),
PauseClient::new(),
"app_funrun_cancel_job",
|tx| {
async {
Expand Down
2 changes: 0 additions & 2 deletions crates/application/src/deploy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use common::{
Resource,
},
errors::JsError,
pause::PauseClient,
runtime::{
Runtime,
UnixTimestamp,
Expand Down Expand Up @@ -243,7 +242,6 @@ impl<RT: Runtime> Application<RT> {
.execute_with_occ_retries(
Identity::system(),
FunctionUsageTracker::new(),
PauseClient::new(),
WriteSource::new("start_push"),
|tx| {
async move {
Expand Down
4 changes: 2 additions & 2 deletions crates/application/src/exports/export_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub async fn write_storage_table<'a, 'b: 'a, RT: Runtime>(
let mut table_upload = zip_snapshot_upload
.start_system_table(path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone())
.await?;
let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None);
let table_iterator = worker.database.table_iterator(snapshot_ts, 1000);
let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None);
pin_mut!(stream);
while let Some((doc, _ts)) = stream.try_next().await? {
Expand All @@ -95,7 +95,7 @@ pub async fn write_storage_table<'a, 'b: 'a, RT: Runtime>(
}
table_upload.complete().await?;

let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None);
let table_iterator = worker.database.table_iterator(snapshot_ts, 1000);
let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None);
pin_mut!(stream);
while let Some((doc, _ts)) = stream.try_next().await? {
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/exports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub async fn write_table<'a, 'b: 'a, RT: Runtime>(
.start_table(path_prefix, table_name.clone())
.await?;

let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None);
let table_iterator = worker.database.table_iterator(snapshot_ts, 1000);
let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None);
pin_mut!(stream);

Expand Down
3 changes: 0 additions & 3 deletions crates/application/src/exports/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use common::{
document::ParsedDocument,
errors::report_error,
execution_context::ExecutionId,
pause::PauseClient,
runtime::Runtime,
types::UdfIdentifier,
RequestId,
Expand Down Expand Up @@ -208,7 +207,6 @@ impl<RT: Runtime> ExportWorker<RT> {
.execute_with_occ_retries(
Identity::system(),
FunctionUsageTracker::new(),
PauseClient::new(),
"export_worker_update_progress",
move |tx| {
let msg = msg.clone();
Expand Down Expand Up @@ -272,7 +270,6 @@ impl<RT: Runtime> ExportWorker<RT> {
.execute_with_occ_retries(
Identity::system(),
FunctionUsageTracker::new(),
PauseClient::new(),
"export_worker_mark_complete",
|tx| {
let object_key = object_key.clone();
Expand Down
29 changes: 6 additions & 23 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ use common::{
log_lines::LogLines,
log_streaming::LogSender,
paths::FieldPath,
pause::PauseClient,
persistence::Persistence,
query_journal::QueryJournal,
runtime::{
Expand Down Expand Up @@ -554,8 +553,6 @@ impl<RT: Runtime> Application<RT> {
node_actions: Actions<RT>,
log_sender: Arc<dyn LogSender>,
log_visibility: Arc<dyn LogVisibility<RT>>,
snapshot_import_pause_client: PauseClient,
scheduled_jobs_pause_client: PauseClient,
app_auth: Arc<ApplicationAuth>,
cache: QueryCache,
) -> anyhow::Result<Self> {
Expand Down Expand Up @@ -588,9 +585,8 @@ impl<RT: Runtime> Application<RT> {
segment_term_metadata_fetcher,
);
let search_worker = Arc::new(Mutex::new(search_worker));
let search_and_vector_bootstrap_worker = Arc::new(Mutex::new(
database.start_search_and_vector_bootstrap(PauseClient::new()),
));
let search_and_vector_bootstrap_worker =
Arc::new(Mutex::new(database.start_search_and_vector_bootstrap()));
let table_summary_worker =
TableSummaryWorker::start(runtime.clone(), database.clone(), persistence.clone());
let schema_worker = Arc::new(Mutex::new(runtime.spawn(
Expand Down Expand Up @@ -633,7 +629,6 @@ impl<RT: Runtime> Application<RT> {
database.clone(),
runner.clone(),
function_log.clone(),
scheduled_jobs_pause_client,
);

let cron_job_executor_fut = CronJobExecutor::start(
Expand Down Expand Up @@ -663,7 +658,6 @@ impl<RT: Runtime> Application<RT> {
snapshot_imports_storage.clone(),
file_storage.clone(),
database.usage_counter().clone(),
snapshot_import_pause_client,
);
let snapshot_import_worker = Arc::new(Mutex::new(
runtime.spawn("snapshot_import_worker", snapshot_import_worker),
Expand Down Expand Up @@ -996,7 +990,6 @@ impl<RT: Runtime> Application<RT> {
// Identifier used to make this mutation idempotent.
mutation_identifier: Option<SessionRequestIdentifier>,
caller: FunctionCaller,
pause_client: PauseClient,
) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>> {
identity.ensure_can_run_function(UdfType::Mutation)?;
let block_logging = self
Expand All @@ -1016,7 +1009,6 @@ impl<RT: Runtime> Application<RT> {
identity,
mutation_identifier,
caller,
pause_client,
)
.await
{
Expand Down Expand Up @@ -1258,7 +1250,6 @@ impl<RT: Runtime> Application<RT> {
identity,
None,
caller,
PauseClient::new(),
)
.await
.map(|res| {
Expand Down Expand Up @@ -2817,7 +2808,6 @@ impl<RT: Runtime> Application<RT> {
{
self.execute_with_audit_log_events_and_occ_retries_with_pause_client(
identity,
PauseClient::new(),
write_source,
f,
)
Expand All @@ -2841,7 +2831,6 @@ impl<RT: Runtime> Application<RT> {
{
self.execute_with_audit_log_events_and_occ_retries_with_pause_client(
identity,
PauseClient::new(),
write_source,
f,
)
Expand All @@ -2851,7 +2840,6 @@ impl<RT: Runtime> Application<RT> {
pub async fn execute_with_audit_log_events_and_occ_retries_with_pause_client<'a, F, T>(
&self,
identity: Identity,
pause_client: PauseClient,
write_source: impl Into<WriteSource>,
f: F,
) -> anyhow::Result<(T, OccRetryStats)>
Expand All @@ -2865,13 +2853,9 @@ impl<RT: Runtime> Application<RT> {
{
let db = self.database.clone();
let (ts, (t, events), stats) = db
.execute_with_occ_retries(
identity,
FunctionUsageTracker::new(),
pause_client,
write_source,
|tx| Self::insert_deployment_audit_log_events(tx, &f).into(),
)
.execute_with_occ_retries(identity, FunctionUsageTracker::new(), write_source, |tx| {
Self::insert_deployment_audit_log_events(tx, &f).into()
})
.await?;
// Send deployment audit logs
// TODO CX-5139 Remove this when audit logs are being processed in LogManager.
Expand All @@ -2890,7 +2874,6 @@ impl<RT: Runtime> Application<RT> {
&'a self,
identity: Identity,
usage: FunctionUsageTracker,
pause_client: PauseClient,
write_source: impl Into<WriteSource>,
f: F,
) -> anyhow::Result<(Timestamp, T)>
Expand All @@ -2900,7 +2883,7 @@ impl<RT: Runtime> Application<RT> {
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'b, 'a, anyhow::Result<T>>,
{
self.database
.execute_with_occ_retries(identity, usage, pause_client, write_source, f)
.execute_with_occ_retries(identity, usage, write_source, f)
.await
.map(|(ts, t, _)| (ts, t))
}
Expand Down
Loading

0 comments on commit 4121cc0

Please sign in to comment.