mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 23:04:26 +01:00
Merge #4308
4308: Fix hang on `/indexes` and `/stats` routes r=Kerollmops a=dureuill # Pull Request ## Related issue Fixes #4218 ## Context - A previous fix added a field to the `IndexScheduler` to memorize the `currently_updating_index`, so that accessing it through the search would return the handle without trying to open it. This resolved a hang on the search, but #4218 reported further hangs on the `/indexes` and `/stats` routes - These routes were shunting the `IndexScheduler` and using internal `IndexMapper` logic to access the indexes, again trying to reopen the updating index. ## What does this PR do? - Moves the logic relative to the `currently_updating_index` from the `IndexScheduler` to the `IndexMapper`, so that any index request to the `IndexMapper` can benefit from it. ## Test 1. Follow reproducer from #4218 2. Before this PR, notice a hang on `/stats` and `/indexes`, but not on `/indexes/<updating_index>/search` 3. After this PR, notice no hang on either of `/stats`, `/indexes` or `/indexes/<updating_index>/search` Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
93363b0201
@ -936,8 +936,8 @@ impl IndexScheduler {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick
|
// the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick
|
||||||
*self.currently_updating_index.write().unwrap() =
|
self.index_mapper
|
||||||
Some((index_uid.clone(), index.clone()));
|
.set_currently_updating_index(Some((index_uid.clone(), index.clone())));
|
||||||
|
|
||||||
let mut index_wtxn = index.write_txn()?;
|
let mut index_wtxn = index.write_txn()?;
|
||||||
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
||||||
|
@ -69,6 +69,10 @@ pub struct IndexMapper {
|
|||||||
/// Whether we open a meilisearch index with the MDB_WRITEMAP option or not.
|
/// Whether we open a meilisearch index with the MDB_WRITEMAP option or not.
|
||||||
enable_mdb_writemap: bool,
|
enable_mdb_writemap: bool,
|
||||||
pub indexer_config: Arc<IndexerConfig>,
|
pub indexer_config: Arc<IndexerConfig>,
|
||||||
|
|
||||||
|
/// A few types of long running batches of tasks that act on a single index set this field
|
||||||
|
/// so that a handle to the index is available from other threads (search) in an optimized manner.
|
||||||
|
currently_updating_index: Arc<RwLock<Option<(String, Index)>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Whether the index is available for use or is forbidden to be inserted back in the index map
|
/// Whether the index is available for use or is forbidden to be inserted back in the index map
|
||||||
@ -151,6 +155,7 @@ impl IndexMapper {
|
|||||||
index_growth_amount,
|
index_growth_amount,
|
||||||
enable_mdb_writemap,
|
enable_mdb_writemap,
|
||||||
indexer_config: Arc::new(indexer_config),
|
indexer_config: Arc::new(indexer_config),
|
||||||
|
currently_updating_index: Default::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -303,6 +308,14 @@ impl IndexMapper {
|
|||||||
|
|
||||||
/// Return an index, may open it if it wasn't already opened.
|
/// Return an index, may open it if it wasn't already opened.
|
||||||
pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result<Index> {
|
pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result<Index> {
|
||||||
|
if let Some((current_name, current_index)) =
|
||||||
|
self.currently_updating_index.read().unwrap().as_ref()
|
||||||
|
{
|
||||||
|
if current_name == name {
|
||||||
|
return Ok(current_index.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let uuid = self
|
let uuid = self
|
||||||
.index_mapping
|
.index_mapping
|
||||||
.get(rtxn, name)?
|
.get(rtxn, name)?
|
||||||
@ -474,4 +487,8 @@ impl IndexMapper {
|
|||||||
pub fn indexer_config(&self) -> &IndexerConfig {
|
pub fn indexer_config(&self) -> &IndexerConfig {
|
||||||
&self.indexer_config
|
&self.indexer_config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_currently_updating_index(&self, index: Option<(String, Index)>) {
|
||||||
|
*self.currently_updating_index.write().unwrap() = index;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,6 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
|||||||
test_breakpoint_sdr: _,
|
test_breakpoint_sdr: _,
|
||||||
planned_failures: _,
|
planned_failures: _,
|
||||||
run_loop_iteration: _,
|
run_loop_iteration: _,
|
||||||
currently_updating_index: _,
|
|
||||||
embedders: _,
|
embedders: _,
|
||||||
} = scheduler;
|
} = scheduler;
|
||||||
|
|
||||||
|
@ -351,10 +351,6 @@ pub struct IndexScheduler {
|
|||||||
/// The path to the version file of Meilisearch.
|
/// The path to the version file of Meilisearch.
|
||||||
pub(crate) version_file_path: PathBuf,
|
pub(crate) version_file_path: PathBuf,
|
||||||
|
|
||||||
/// A few types of long running batches of tasks that act on a single index set this field
|
|
||||||
/// so that a handle to the index is available from other threads (search) in an optimized manner.
|
|
||||||
currently_updating_index: Arc<RwLock<Option<(String, Index)>>>,
|
|
||||||
|
|
||||||
embedders: Arc<RwLock<HashMap<EmbedderOptions, Arc<Embedder>>>>,
|
embedders: Arc<RwLock<HashMap<EmbedderOptions, Arc<Embedder>>>>,
|
||||||
|
|
||||||
// ================= test
|
// ================= test
|
||||||
@ -403,7 +399,6 @@ impl IndexScheduler {
|
|||||||
version_file_path: self.version_file_path.clone(),
|
version_file_path: self.version_file_path.clone(),
|
||||||
webhook_url: self.webhook_url.clone(),
|
webhook_url: self.webhook_url.clone(),
|
||||||
webhook_authorization_header: self.webhook_authorization_header.clone(),
|
webhook_authorization_header: self.webhook_authorization_header.clone(),
|
||||||
currently_updating_index: self.currently_updating_index.clone(),
|
|
||||||
embedders: self.embedders.clone(),
|
embedders: self.embedders.clone(),
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
||||||
@ -504,7 +499,6 @@ impl IndexScheduler {
|
|||||||
version_file_path: options.version_file_path,
|
version_file_path: options.version_file_path,
|
||||||
webhook_url: options.webhook_url,
|
webhook_url: options.webhook_url,
|
||||||
webhook_authorization_header: options.webhook_authorization_header,
|
webhook_authorization_header: options.webhook_authorization_header,
|
||||||
currently_updating_index: Arc::new(RwLock::new(None)),
|
|
||||||
embedders: Default::default(),
|
embedders: Default::default(),
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -688,13 +682,6 @@ impl IndexScheduler {
|
|||||||
/// If you need to fetch information from or perform an action on all indexes,
|
/// If you need to fetch information from or perform an action on all indexes,
|
||||||
/// see the `try_for_each_index` function.
|
/// see the `try_for_each_index` function.
|
||||||
pub fn index(&self, name: &str) -> Result<Index> {
|
pub fn index(&self, name: &str) -> Result<Index> {
|
||||||
if let Some((current_name, current_index)) =
|
|
||||||
self.currently_updating_index.read().unwrap().as_ref()
|
|
||||||
{
|
|
||||||
if current_name == name {
|
|
||||||
return Ok(current_index.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let rtxn = self.env.read_txn()?;
|
let rtxn = self.env.read_txn()?;
|
||||||
self.index_mapper.index(&rtxn, name)
|
self.index_mapper.index(&rtxn, name)
|
||||||
}
|
}
|
||||||
@ -1175,7 +1162,7 @@ impl IndexScheduler {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Reset the currently updating index to relinquish the index handle
|
// Reset the currently updating index to relinquish the index handle
|
||||||
*self.currently_updating_index.write().unwrap() = None;
|
self.index_mapper.set_currently_updating_index(None);
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
|
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
|
||||||
|
Loading…
Reference in New Issue
Block a user