first working version

This commit is contained in:
tamo 2021-05-06 18:44:16 +02:00
parent 5f5402a3ab
commit 40ced3ff8d
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
5 changed files with 68 additions and 27 deletions

View File

@ -59,10 +59,11 @@ impl DumpVersion {
size: usize, size: usize,
dump_path: &Path, dump_path: &Path,
index_path: &Path, index_path: &Path,
primary_key: Option<&str>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match self { match self {
Self::V1 => v1::import_index(size, dump_path, index_path), Self::V1 => v1::import_index(size, dump_path, index_path, primary_key),
Self::V2 => v2::import_index(size, dump_path, index_path), Self::V2 => v2::import_index(size, dump_path, index_path, primary_key),
} }
} }
} }
@ -206,7 +207,26 @@ pub fn load_dump(
// this cannot fail since we created all the missing uuid in the previous loop // this cannot fail since we created all the missing uuid in the previous loop
let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap(); let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap();
let index_path = db_path.join(&format!("indexes/index-{}", uuid)); let index_path = db_path.join(&format!("indexes/index-{}", uuid));
// let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db // let update_path = db_path.join(&format!("updates"));
info!("importing the updates");
use crate::index_controller::update_actor::UpdateStore;
use std::io::BufRead;
let update_path = db_path.join("updates");
let options = EnvOpenOptions::new();
// create an UpdateStore to import the updates
std::fs::create_dir_all(&update_path)?;
let (update_store, _) = UpdateStore::create(options, update_path)?;
let file = File::open(&dump_path.join("updates.jsonl"))?;
let reader = std::io::BufReader::new(file);
let mut wtxn = update_store.env.write_txn()?;
for update in reader.lines() {
let update = serde_json::from_str(&update?)?;
update_store.register_raw_updates(&mut wtxn, update, uuid)?;
}
wtxn.commit()?;
info!( info!(
"Importing dump from {} into {}...", "Importing dump from {} into {}...",
@ -215,11 +235,12 @@ pub fn load_dump(
); );
metadata metadata
.dump_version .dump_version
.import_index(size, &dump_path, &index_path) .import_index(size, &dump_path, &index_path, idx.meta.primary_key.as_ref().map(|s| s.as_ref()))
.unwrap(); .unwrap();
info!("Dump importation from {} succeed", dump_path.display()); info!("Dump importation from {} succeed", dump_path.display());
} }
info!("Dump importation from {} succeed", dump_path.display()); info!("Dump importation from {} succeed", dump_path.display());
Ok(()) Ok(())
} }

View File

@ -78,7 +78,7 @@ fn import_settings(dir_path: &Path) -> anyhow::Result<Settings> {
} }
pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> { pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> {
info!("Importing a dump from an old version of meilisearch with dump version 1"); info!("Importing a dump from an old version of meilisearch with dump version 1");
std::fs::create_dir_all(&index_path)?; std::fs::create_dir_all(&index_path)?;
@ -102,7 +102,7 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow:
IndexDocumentsMethod::ReplaceDocuments, IndexDocumentsMethod::ReplaceDocuments,
Some(reader), Some(reader),
update_builder, update_builder,
None, primary_key,
)?; )?;
// at this point we should handle the updates, but since the update logic is not handled in // at this point we should handle the updates, but since the update logic is not handled in

View File

@ -1,5 +1,5 @@
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}};
use crate::index::Index; use crate::index::Index;
use crate::index_controller::Settings; use crate::index_controller::Settings;
use std::{fs::File, path::Path, sync::Arc}; use std::{fs::File, path::Path, sync::Arc};
@ -14,7 +14,7 @@ fn import_settings(dir_path: &Path) -> anyhow::Result<Settings> {
Ok(metadata) Ok(metadata)
} }
pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> { pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> {
std::fs::create_dir_all(&index_path)?; std::fs::create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(size); options.map_size(size);
@ -26,17 +26,21 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow:
let update_builder = UpdateBuilder::new(0); let update_builder = UpdateBuilder::new(0);
index.update_settings(&settings, update_builder)?; index.update_settings(&settings, update_builder)?;
// import the documents in the index
let update_builder = UpdateBuilder::new(1); let update_builder = UpdateBuilder::new(1);
let file = File::open(&dump_path.join("documents.jsonl"))?; let file = File::open(&dump_path.join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file); let reader = std::io::BufReader::new(file);
index.update_documents( // TODO: TAMO: currently we ignore any error caused by the importation of the documents because
// if there is no documents nor primary key it'll throw an anyhow error, but we must remove
// this before the merge on main
let _ = index.update_documents(
UpdateFormat::JsonStream, UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments, IndexDocumentsMethod::ReplaceDocuments,
Some(reader), Some(reader),
update_builder, update_builder,
None, primary_key,
)?; );
// the last step: we extract the original milli::Index and close it // the last step: we extract the original milli::Index and close it
Arc::try_unwrap(index.0) Arc::try_unwrap(index.0)

View File

@ -31,7 +31,7 @@ pub type IndexResult<T> = std::result::Result<T, IndexError>;
pub struct IndexMeta { pub struct IndexMeta {
created_at: DateTime<Utc>, created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>, pub updated_at: DateTime<Utc>,
primary_key: Option<String>, pub primary_key: Option<String>,
} }
impl IndexMeta { impl IndexMeta {

View File

@ -250,21 +250,31 @@ impl UpdateStore {
.get(txn, &NextIdKey::Global)? .get(txn, &NextIdKey::Global)?
.map(U64::get) .map(U64::get)
.unwrap_or_default(); .unwrap_or_default();
self.next_update_id
.put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?;
let update_id = self.next_update_id_raw(txn, index_uuid)?;
Ok((global_id, update_id))
}
/// Returns the next next update id for a given `index_uuid` without
/// incrementing the global update id. This is useful for the dumps.
fn next_update_id_raw(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<u64> {
let update_id = self let update_id = self
.next_update_id .next_update_id
.get(txn, &NextIdKey::Index(index_uuid))? .get(txn, &NextIdKey::Index(index_uuid))?
.map(U64::get) .map(U64::get)
.unwrap_or_default(); .unwrap_or_default();
self.next_update_id
.put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?;
self.next_update_id.put( self.next_update_id.put(
txn, txn,
&NextIdKey::Index(index_uuid), &NextIdKey::Index(index_uuid),
&BEU64::new(update_id + 1), &BEU64::new(update_id + 1),
)?; )?;
Ok((global_id, update_id)) Ok(update_id)
} }
/// Registers the update content in the pending store and the meta /// Registers the update content in the pending store and the meta
@ -291,17 +301,27 @@ impl UpdateStore {
Ok(meta) Ok(meta)
} }
/// Push already processed updates in the UpdateStore. This is useful for the dumps /// Push already processed update in the UpdateStore without triggering the notification
pub fn register_already_processed_update ( /// process. This is useful for the dumps.
pub fn register_raw_updates (
&self, &self,
result: UpdateStatus, wtxn: &mut heed::RwTxn,
update: UpdateStatus,
index_uuid: Uuid, index_uuid: Uuid,
) -> heed::Result<()> { ) -> heed::Result<()> {
// TODO: TAMO: load already processed updates // TODO: TAMO: since I don't want to store anything I currently generate a new global ID
let mut wtxn = self.env.write_txn()?; // everytime I encounter an enqueued update, can we do better?
let (_global_id, update_id) = self.next_update_id(&mut wtxn, index_uuid)?; match update {
self.updates.remap_key_type::<UpdateKeyCodec>().put(&mut wtxn, &(index_uuid, update_id), &result)?; UpdateStatus::Enqueued(enqueued) => {
wtxn.commit() let (global_id, update_id) = self.next_update_id(wtxn, index_uuid)?;
self.pending_queue.remap_key_type::<PendingKeyCodec>().put(wtxn, &(global_id, index_uuid, update_id), &enqueued)?;
}
_ => {
let update_id = self.next_update_id_raw(wtxn, index_uuid)?;
self.updates.remap_key_type::<UpdateKeyCodec>().put(wtxn, &(index_uuid, update_id), &update)?;
}
}
Ok(())
} }
/// Executes the user provided function on the next pending update (the one with the lowest id). /// Executes the user provided function on the next pending update (the one with the lowest id).
@ -542,9 +562,6 @@ impl UpdateStore {
} }
} }
// TODO: TAMO: the updates
// already processed updates seems to works, but I've not tried with currently running updates
let update_files_path = path.join("update_files"); let update_files_path = path.join("update_files");
create_dir_all(&update_files_path)?; create_dir_all(&update_files_path)?;
@ -561,7 +578,6 @@ impl UpdateStore {
} }
} }
// Perform the dump of each index concurently. Only a third of the capabilities of // Perform the dump of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor // the index actor at a time not to put too much pressure on the index actor
let path = &path; let path = &path;