Skip to content

Commit

Permalink
Add retention running flag to BackfillIndexState (#24069)
Browse files Browse the repository at this point in the history
Create a new index_created_lower_bound field so we can update index entry in backfilled state. Currently, we rely on it never updating. Also add retention_running flag so we can so the period we don't run retention for in newly backfilled indexes is minimal.

This change adds and populates the new fields and adds a migration to backfill them properly. I am going to change IndexWorker in a separate PR.

GitOrigin-RevId: 1b28ed0391f203a373ad4706cfcc65b2628eece2
  • Loading branch information
Preslav Le authored and Convex, Inc. committed Mar 28, 2024
1 parent 6377b2b commit 9da04db
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 58 deletions.
3 changes: 2 additions & 1 deletion crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2075,7 +2075,8 @@ impl<RT: Runtime> Application<RT> {
let mut tx = self.begin(identity.clone()).await?;
for (index_name, index_fields) in indexes.into_iter() {
let index_fields = self._validate_user_defined_index_fields(index_fields)?;
let index_metadata = IndexMetadata::new_backfilling(index_name, index_fields);
let index_metadata =
IndexMetadata::new_backfilling(*tx.begin_timestamp(), index_name, index_fields);
let mut model = IndexModel::new(&mut tx);
if let Some(existing_index_metadata) = model
.pending_index_metadata(&index_metadata.name)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,52 @@ use serde::{
Deserialize,
Serialize,
};
use sync_types::Timestamp;

/// Represents state of currently backfilling index.
/// We currently do not checkpoint. Will extend the struct when we do.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
pub struct DatabaseIndexBackfillState;
pub struct DatabaseIndexBackfillState {
// A timestamp when the index was created. Note that this timestamp is slightly
// before the index was committed because we don't know the commit timestamp.
// We need to run retention from this timestamp, because live writes write to
// the index the moment the index committed.
pub index_created_lower_bound: Option<Timestamp>,
// We have done the backfill and the only step left is catch up retention.
pub retention_started: bool,
}

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SerializedDatabaseIndexBackfillState {}
pub struct SerializedDatabaseIndexBackfillState {
// TODO: Backfill and remove optional.
index_created_lower_bound: Option<i64>,
retention_started: Option<bool>,
}

impl TryFrom<DatabaseIndexBackfillState> for SerializedDatabaseIndexBackfillState {
type Error = anyhow::Error;

fn try_from(_config: DatabaseIndexBackfillState) -> anyhow::Result<Self> {
Ok(Self {})
fn try_from(config: DatabaseIndexBackfillState) -> anyhow::Result<Self> {
Ok(Self {
index_created_lower_bound: config.index_created_lower_bound.map(|ts| ts.into()),
retention_started: Some(config.retention_started),
})
}
}

impl TryFrom<SerializedDatabaseIndexBackfillState> for DatabaseIndexBackfillState {
type Error = anyhow::Error;

fn try_from(_config: SerializedDatabaseIndexBackfillState) -> anyhow::Result<Self> {
Ok(Self)
fn try_from(config: SerializedDatabaseIndexBackfillState) -> anyhow::Result<Self> {
Ok(Self {
index_created_lower_bound: config
.index_created_lower_bound
.map(|ts| ts.try_into())
.transpose()?,
// Treat legacy records as retention not started.
retention_started: config.retention_started.unwrap_or(false),
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ impl TryFrom<SerializedDatabaseIndexState> for DatabaseIndexState {
},
SerializedDatabaseIndexState::Backfilled2 => DatabaseIndexState::Backfilled,
SerializedDatabaseIndexState::Enabled => DatabaseIndexState::Enabled,
// TODO(Presley): Backfill and delete Disabled state.
SerializedDatabaseIndexState::Disabled => {
DatabaseIndexState::Backfilling(DatabaseIndexBackfillState)
DatabaseIndexState::Backfilling(DatabaseIndexBackfillState {
index_created_lower_bound: None,
retention_started: false,
})
},
})
}
Expand Down
12 changes: 10 additions & 2 deletions crates/common/src/bootstrap_model/index/index_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::{
Deserialize,
Serialize,
};
use sync_types::Timestamp;
use value::{
codegen_convex_serialization,
ConvexValue,
Expand Down Expand Up @@ -61,12 +62,19 @@ pub struct IndexMetadata<T: TableIdentifier> {
}

impl<T: TableIdentifier> IndexMetadata<T> {
pub fn new_backfilling(name: GenericIndexName<T>, fields: IndexedFields) -> Self {
pub fn new_backfilling(
index_created_lower_bound: Timestamp,
name: GenericIndexName<T>,
fields: IndexedFields,
) -> Self {
Self {
name,
config: IndexConfig::Database {
developer_config: DeveloperDatabaseIndexConfig { fields },
on_disk_state: DatabaseIndexState::Backfilling(DatabaseIndexBackfillState {}),
on_disk_state: DatabaseIndexState::Backfilling(DatabaseIndexBackfillState {
index_created_lower_bound: Some(index_created_lower_bound),
retention_started: false,
}),
},
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/database/src/bootstrap_model/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
for (index_descriptor, index_schema) in &table_schema.indexes {
let index_name = IndexName::new(table_name.clone(), index_descriptor.clone())?;
indexes_in_schema.push(IndexMetadata::new_backfilling(
*self.tx.begin_timestamp(),
index_name.clone(),
index_schema.fields.clone(),
))
Expand Down Expand Up @@ -788,11 +789,10 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
}
let index_name = TabletIndexName::new(target_table, index.name.descriptor().clone())?;
let metadata = match index.into_value().config {
// Table is empty, so it's okay to create indexes in state Enabled.
IndexConfig::Database {
developer_config: DeveloperDatabaseIndexConfig { fields },
..
} => IndexMetadata::new_backfilling(index_name, fields),
} => IndexMetadata::new_backfilling(*self.tx.begin_timestamp(), index_name, fields),
IndexConfig::Search {
developer_config:
DeveloperSearchIndexConfig {
Expand Down
16 changes: 9 additions & 7 deletions crates/database/src/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,13 +864,15 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
return Ok(());
};

// Don't run retention for indexes that are currently backfilling. This
// is important for correctness since IndexBackfilling and retention
// interact poorly. NOTE that accumulate only adds indexes. Thus we won't
// stop running retention if index is deleted or goes from Enabled to
// Backfilling.
if let DatabaseIndexState::Backfilling { .. } = on_disk_state {
return Ok(());
// Don't run retention for indexes that are still backfilling unless IndexWorker
// has explicitly opted-in to running retention. This is important for
// correctness since index backfill and retention interact poorly.
// NOTE: accumulate only adds indexes. Thus we won't stop running
// retention if index is deleted or changes from Enabled to Backfilling.
if let DatabaseIndexState::Backfilling(state) = on_disk_state {
if !state.retention_started {
return Ok(());
}
}

all_indexes.insert(index_id, (index.name, developer_config.fields));
Expand Down
9 changes: 9 additions & 0 deletions crates/database/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,10 @@ where
let index_name = IndexName::new(table_name.clone(), "a_and_b".parse()?)?;

let mut tx = database.begin(Identity::system()).await?;
let begin_ts = tx.begin_timestamp();
IndexModel::new(&mut tx)
.add_application_index(IndexMetadata::new_backfilling(
*begin_ts,
index_name.clone(),
vec![str::parse("a")?, str::parse("b")?].try_into()?,
))
Expand Down Expand Up @@ -1466,12 +1468,14 @@ async fn test_add_indexes_limit(rt: TestRuntime) -> anyhow::Result<()> {
// load once to initialize
let DbFixtures { db, tp, .. } = DbFixtures::new(&rt).await?;
let mut tx = db.begin(Identity::system()).await?;
let begin_ts = tx.begin_timestamp();

// Add the maximum allowed number of indexes.
for i in 0..MAX_USER_INDEXES {
let field_name = format!("field_{}", i);
IndexModel::new(&mut tx)
.add_application_index(IndexMetadata::new_backfilling(
*begin_ts,
IndexName::new("table".parse()?, format!("by_{}", field_name).parse()?)?,
vec![field_name.parse()?].try_into()?,
))
Expand All @@ -1481,6 +1485,7 @@ async fn test_add_indexes_limit(rt: TestRuntime) -> anyhow::Result<()> {
// Try to add one more. Should fail.
let err = IndexModel::new(&mut tx)
.add_application_index(IndexMetadata::new_backfilling(
*begin_ts,
IndexName::new("table".parse()?, "by_field_max".parse()?)?,
vec!["field_max".parse()?].try_into()?,
))
Expand All @@ -1506,8 +1511,10 @@ async fn test_add_indexes_limit(rt: TestRuntime) -> anyhow::Result<()> {
)
.await?;
let mut tx = db.begin(Identity::system()).await?;
let begin_ts = tx.begin_timestamp();
let err = IndexModel::new(&mut tx)
.add_application_index(IndexMetadata::new_backfilling(
*begin_ts,
IndexName::new("table".parse()?, "by_field_max".parse()?)?,
vec!["field_32".parse()?].try_into()?,
))
Expand Down Expand Up @@ -1590,8 +1597,10 @@ async fn test_index_backfill(rt: TestRuntime) -> anyhow::Result<()> {

let index_name = IndexName::new(table_name, "a_and_b".parse()?)?;
let mut tx = db.begin_system().await?;
let begin_ts = tx.begin_timestamp();
IndexModel::new(&mut tx)
.add_application_index(IndexMetadata::new_backfilling(
*begin_ts,
index_name.clone(),
vec![str::parse("a")?, str::parse("b")?].try_into()?,
))
Expand Down
3 changes: 3 additions & 0 deletions crates/database/src/writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ mod tests {
},
};
use maplit::btreeset;
use sync_types::Timestamp;
use value::{
assert_obj,
ResolvedDocumentId,
Expand Down Expand Up @@ -369,6 +370,7 @@ mod tests {
id_generator.generate(&INDEX_TABLE),
CreationTime::ONE,
IndexMetadata::new_backfilling(
Timestamp::MIN,
TabletIndexName::new(user_table1.table_id, "by_likes".parse()?)?,
IndexedFields::by_id(),
)
Expand Down Expand Up @@ -398,6 +400,7 @@ mod tests {
id_generator.generate(&INDEX_TABLE),
CreationTime::ONE,
IndexMetadata::new_backfilling(
Timestamp::MIN,
TabletIndexName::new(user_table2.table_id, "by_likes".parse()?)?,
IndexedFields::by_id(),
)
Expand Down
49 changes: 21 additions & 28 deletions crates/indexing/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use common::{
GenericIndexName,
PersistenceVersion,
TableName,
TabletIndexName,
Timestamp,
},
};
Expand Down Expand Up @@ -328,15 +327,23 @@ fn test_second_pending_index_for_name_fails() -> anyhow::Result<()> {
let by_name = GenericIndexName::new(table.table_id, "by_name".parse()?)?;
let pending = gen_index_document(
&mut id_generator,
IndexMetadata::new_backfilling(by_name.clone(), vec!["name".parse()?].try_into()?),
IndexMetadata::new_backfilling(
Timestamp::MIN,
by_name.clone(),
vec!["name".parse()?].try_into()?,
),
)?;
let result = index_registry.update(None, Some(&pending));
assert!(result.is_ok());
let name_collision = ResolvedDocument::new(
id_generator.generate(&INDEX_TABLE),
CreationTime::ONE,
IndexMetadata::new_backfilling(by_name.clone(), vec!["other_field".parse()?].try_into()?)
.try_into()?,
IndexMetadata::new_backfilling(
Timestamp::MIN,
by_name.clone(),
vec!["other_field".parse()?].try_into()?,
)
.try_into()?,
)?;
let result = index_registry.update(None, Some(&name_collision));
assert!(result.is_err());
Expand Down Expand Up @@ -703,44 +710,30 @@ fn new_enabled_doc(
name: &str,
fields: Vec<&str>,
) -> anyhow::Result<ResolvedDocument> {
new_index_doc(
id_generator,
table_id,
name,
fields,
&IndexMetadata::new_enabled,
)
}
let index_name = GenericIndexName::new(table_id, name.parse()?)?;
let field_paths = fields
.into_iter()
.map(|field| field.parse())
.collect::<anyhow::Result<Vec<FieldPath>>>()?;

fn new_pending_doc(
id_generator: &mut dyn IdGenerator,
table_id: TableId,
name: &str,
fields: Vec<&str>,
) -> anyhow::Result<ResolvedDocument> {
new_index_doc(
id_generator,
table_id,
name,
fields,
&IndexMetadata::new_backfilling,
)
let metadata = IndexMetadata::new_enabled(index_name, field_paths.try_into()?);
gen_index_document(id_generator, metadata)
}

fn new_index_doc(
fn new_pending_doc(
id_generator: &mut dyn IdGenerator,
table_id: TableId,
name: &str,
fields: Vec<&str>,
get_metadata: &dyn Fn(TabletIndexName, IndexedFields) -> TabletIndexMetadata,
) -> anyhow::Result<ResolvedDocument> {
let index_name = GenericIndexName::new(table_id, name.parse()?)?;
let field_paths = fields
.into_iter()
.map(|field| field.parse())
.collect::<anyhow::Result<Vec<FieldPath>>>()?;

let metadata = get_metadata(index_name, field_paths.try_into()?);
let metadata =
IndexMetadata::new_backfilling(Timestamp::MIN, index_name, field_paths.try_into()?);
gen_index_document(id_generator, metadata)
}

Expand Down
1 change: 1 addition & 0 deletions crates/isolate/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ impl<RT: Runtime, P: Persistence + Clone> UdfTest<RT, P> {
let mut tx = self.database.begin(Identity::system()).await?;
let index_name = name.parse()?;
let index = IndexMetadata::new_backfilling(
*tx.begin_timestamp(),
index_name,
IndexedFields::try_from(vec![field.parse()?])?,
);
Expand Down
2 changes: 2 additions & 0 deletions crates/isolate/src/tests/adversarial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ async fn test_read_many_documents(rt: TestRuntime) -> anyhow::Result<()> {
async fn test_reads_too_many(rt: TestRuntime) -> anyhow::Result<()> {
let t = UdfTest::default(rt).await?;
t.add_index(IndexMetadata::new_backfilling(
*t.database.now_ts_for_reads(),
"test.by_hello".parse()?,
IndexedFields::try_from(vec!["hello".parse()?])?,
))
Expand All @@ -138,6 +139,7 @@ async fn test_reads_too_many(rt: TestRuntime) -> anyhow::Result<()> {
async fn test_reads_many(rt: TestRuntime) -> anyhow::Result<()> {
let t = UdfTest::default(rt).await?;
t.add_index(IndexMetadata::new_backfilling(
*t.database.now_ts_for_reads(),
"test.by_hello".parse()?,
IndexedFields::try_from(vec!["hello".parse()?])?,
))
Expand Down
2 changes: 2 additions & 0 deletions crates/isolate/src/tests/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{

async fn add_index<RT: Runtime, P: Persistence + Clone>(t: &UdfTest<RT, P>) -> anyhow::Result<()> {
t.add_index(IndexMetadata::new_backfilling(
*t.database.now_ts_for_reads(),
"myTable.by_a_b".parse()?,
IndexedFields::try_from(vec!["a".parse()?, "b".parse()?])?,
))
Expand Down Expand Up @@ -739,6 +740,7 @@ async fn test_query_journal_start_to_end(rt: TestRuntime) -> anyhow::Result<()>
async fn test_query_journal_middle_to_middle(rt: TestRuntime) -> anyhow::Result<()> {
let t = UdfTest::default(rt).await?;
t.add_index(IndexMetadata::new_backfilling(
*t.database.now_ts_for_reads(),
"test.by_hello".parse()?,
IndexedFields::try_from(vec!["hello".parse()?])?,
))
Expand Down
7 changes: 5 additions & 2 deletions crates/model/src/config/index_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ pub(crate) use db_schema_with_indexes;

use super::types::ConfigMetadata;

pub fn assert_root_cause<T: Debug>(result: anyhow::Result<T>, expected: &str) {
pub fn assert_root_cause_contains<T: Debug>(result: anyhow::Result<T>, expected: &str) {
let error = result.unwrap_err();
let root_cause = error.root_cause();
assert_eq!(format!("{}", root_cause), expected);
assert!(
format!("{}", root_cause).contains(expected),
"Root cause \"{root_cause}\" does not contain expected string:\n\"{expected}\""
);
}

/// Simulate a CLI pushing a schema, waiting for backfill, then committing the
Expand Down
Loading

0 comments on commit 9da04db

Please sign in to comment.