Save the currently updating index so that the search can access it at all times

This commit is contained in:
Louis Dureuil 2023-11-10 10:50:19 +01:00
parent 54f0ee1ed2
commit a2d0c73b41
No known key found for this signature in database
3 changed files with 21 additions and 0 deletions

View File

@ -923,6 +923,9 @@ impl IndexScheduler {
self.index_mapper.index(&rtxn, &index_uid)? self.index_mapper.index(&rtxn, &index_uid)?
}; };
// the index operation can take a long time, so save this handle to make it available tothe search for the duration of the tick
*self.currently_updating_index.write().unwrap() = Some((index_uid.clone(), index.clone()));
let mut index_wtxn = index.write_txn()?; let mut index_wtxn = index.write_txn()?;
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
index_wtxn.commit()?; index_wtxn.commit()?;
@ -959,6 +962,9 @@ impl IndexScheduler {
Batch::IndexUpdate { index_uid, primary_key, mut task } => { Batch::IndexUpdate { index_uid, primary_key, mut task } => {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?; let index = self.index_mapper.index(&rtxn, &index_uid)?;
// the index update can take a long time, so save this handle to make it available tothe search for the duration of the tick
*self.currently_updating_index.write().unwrap() = Some((index_uid.clone(), index.clone()));
if let Some(primary_key) = primary_key.clone() { if let Some(primary_key) = primary_key.clone() {
let mut index_wtxn = index.write_txn()?; let mut index_wtxn = index.write_txn()?;

View File

@ -39,6 +39,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
test_breakpoint_sdr: _, test_breakpoint_sdr: _,
planned_failures: _, planned_failures: _,
run_loop_iteration: _, run_loop_iteration: _,
currently_updating_index: _,
} = scheduler; } = scheduler;
let rtxn = env.read_txn().unwrap(); let rtxn = env.read_txn().unwrap();

View File

@ -331,6 +331,10 @@ pub struct IndexScheduler {
/// The path to the version file of Meilisearch. /// The path to the version file of Meilisearch.
pub(crate) version_file_path: PathBuf, pub(crate) version_file_path: PathBuf,
/// A few types of long running batches of tasks that act on a single index set this field
/// so that a handle to the index is available from other threads (search) in an optimized manner.
currently_updating_index: Arc<RwLock<Option<(String, Index)>>>,
// ================= test // ================= test
// The next entry is dedicated to the tests. // The next entry is dedicated to the tests.
/// Provide a way to set a breakpoint in multiple part of the scheduler. /// Provide a way to set a breakpoint in multiple part of the scheduler.
@ -374,6 +378,7 @@ impl IndexScheduler {
dumps_path: self.dumps_path.clone(), dumps_path: self.dumps_path.clone(),
auth_path: self.auth_path.clone(), auth_path: self.auth_path.clone(),
version_file_path: self.version_file_path.clone(), version_file_path: self.version_file_path.clone(),
currently_updating_index: self.currently_updating_index.clone(),
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
#[cfg(test)] #[cfg(test)]
@ -470,6 +475,7 @@ impl IndexScheduler {
snapshots_path: options.snapshots_path, snapshots_path: options.snapshots_path,
auth_path: options.auth_path, auth_path: options.auth_path,
version_file_path: options.version_file_path, version_file_path: options.version_file_path,
currently_updating_index: Arc::new(RwLock::new(None)),
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr, test_breakpoint_sdr,
@ -652,6 +658,11 @@ impl IndexScheduler {
/// If you need to fetch information from or perform an action on all indexes, /// If you need to fetch information from or perform an action on all indexes,
/// see the `try_for_each_index` function. /// see the `try_for_each_index` function.
pub fn index(&self, name: &str) -> Result<Index> { pub fn index(&self, name: &str) -> Result<Index> {
if let Some((current_name, current_index)) = self.currently_updating_index.read().unwrap().as_ref() {
if current_name == name {
return Ok(current_index.clone())
}
}
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, name) self.index_mapper.index(&rtxn, name)
} }
@ -1133,6 +1144,9 @@ impl IndexScheduler {
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
}; };
// Reset the currently updating index to relinquish the index handle
*self.currently_updating_index.write().unwrap() = None;
#[cfg(test)] #[cfg(test)]
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;