mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-01 17:15:46 +01:00
fix a synchronization bug while importing tasks
This commit is contained in:
parent
a9eeb070b8
commit
e0221fc0a3
@ -534,6 +534,7 @@ impl IndexScheduler {
|
|||||||
pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> {
|
pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> {
|
||||||
Ok(self.file_store.new_update()?)
|
Ok(self.file_store.new_update()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> {
|
pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> {
|
||||||
Ok(self.file_store.new_update_with_uuid(uuid)?)
|
Ok(self.file_store.new_update_with_uuid(uuid)?)
|
||||||
|
@ -196,15 +196,12 @@ fn import_dump(
|
|||||||
keys.push(key);
|
keys.push(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Import the tasks.
|
let indexer_config = index_scheduler.indexer_config().clone();
|
||||||
for ret in dump_reader.tasks() {
|
|
||||||
let (task, file) = ret?;
|
|
||||||
index_scheduler.register_dumpped_task(task, file, &keys, instance_uid)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let indexer_config = index_scheduler.indexer_config();
|
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
||||||
|
// try to process tasks while we're trying to import the indexes.
|
||||||
|
|
||||||
// 4. Import the indexes.
|
// 3. Import the indexes.
|
||||||
for index_reader in dump_reader.indexes()? {
|
for index_reader in dump_reader.indexes()? {
|
||||||
let mut index_reader = index_reader?;
|
let mut index_reader = index_reader?;
|
||||||
let metadata = index_reader.metadata();
|
let metadata = index_reader.metadata();
|
||||||
@ -214,12 +211,12 @@ fn import_dump(
|
|||||||
let mut wtxn = index.write_txn()?;
|
let mut wtxn = index.write_txn()?;
|
||||||
|
|
||||||
let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config);
|
let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config);
|
||||||
// 4.1 Import the primary key if there is one.
|
// 3.1 Import the primary key if there is one.
|
||||||
if let Some(ref primary_key) = metadata.primary_key {
|
if let Some(ref primary_key) = metadata.primary_key {
|
||||||
builder.set_primary_key(primary_key.to_string());
|
builder.set_primary_key(primary_key.to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4.2 Import the settings.
|
// 3.2 Import the settings.
|
||||||
log::info!("Importing the settings.");
|
log::info!("Importing the settings.");
|
||||||
let settings = index_reader.settings()?;
|
let settings = index_reader.settings()?;
|
||||||
apply_settings_to_builder(&settings, &mut builder);
|
apply_settings_to_builder(&settings, &mut builder);
|
||||||
@ -227,8 +224,8 @@ fn import_dump(
|
|||||||
log::debug!("update: {:?}", indexing_step);
|
log::debug!("update: {:?}", indexing_step);
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// 4.3 Import the documents.
|
// 3.3 Import the documents.
|
||||||
// 4.3.1 We need to recreate the grenad+obkv format accepted by the index.
|
// 3.3.1 We need to recreate the grenad+obkv format accepted by the index.
|
||||||
log::info!("Importing the documents.");
|
log::info!("Importing the documents.");
|
||||||
let mut file = tempfile::tempfile()?;
|
let mut file = tempfile::tempfile()?;
|
||||||
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(&mut file));
|
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(&mut file));
|
||||||
@ -237,7 +234,7 @@ fn import_dump(
|
|||||||
}
|
}
|
||||||
builder.into_inner()?; // this actually flush the content of the batch builder.
|
builder.into_inner()?; // this actually flush the content of the batch builder.
|
||||||
|
|
||||||
// 4.3.2 We feed it to the milli index.
|
// 3.3.2 We feed it to the milli index.
|
||||||
file.seek(SeekFrom::Start(0))?;
|
file.seek(SeekFrom::Start(0))?;
|
||||||
let reader = BufReader::new(file);
|
let reader = BufReader::new(file);
|
||||||
let reader = DocumentsBatchReader::from_reader(reader)?;
|
let reader = DocumentsBatchReader::from_reader(reader)?;
|
||||||
@ -259,6 +256,12 @@ fn import_dump(
|
|||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
log::info!("All documents successfully imported.");
|
log::info!("All documents successfully imported.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 4. Import the tasks.
|
||||||
|
for ret in dump_reader.tasks() {
|
||||||
|
let (task, file) = ret?;
|
||||||
|
index_scheduler.register_dumpped_task(task, file, &keys, instance_uid)?;
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user