From 1667e1b32fc1ca30e8ac9821c856b989bf4f1e7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 16 Oct 2019 17:05:24 +0200 Subject: [PATCH] Move to zerocopy-lmdb --- meilidb-core/Cargo.toml | 9 +- meilidb-core/examples/from_file.rs | 12 +- meilidb-core/src/automaton/mod.rs | 4 +- meilidb-core/src/database.rs | 81 ++++----- meilidb-core/src/error.rs | 10 +- meilidb-core/src/query_builder.rs | 79 +++++---- meilidb-core/src/ranked_map.rs | 3 + meilidb-core/src/serde/deserializer.rs | 18 +- meilidb-core/src/serde/mod.rs | 10 +- meilidb-core/src/store/docs_words.rs | 43 +++-- meilidb-core/src/store/documents_fields.rs | 91 ++++------ .../src/store/documents_fields_counts.rs | 125 ++++++-------- meilidb-core/src/store/main.rs | 148 ++++------------ meilidb-core/src/store/mod.rs | 160 ++++++++++-------- meilidb-core/src/store/postings_lists.rs | 76 ++------- meilidb-core/src/store/synonyms.rs | 39 ++--- meilidb-core/src/store/updates.rs | 108 ++++-------- meilidb-core/src/store/updates_results.rs | 62 ++----- meilidb-core/src/update/customs_update.rs | 11 +- meilidb-core/src/update/documents_addition.rs | 6 +- meilidb-core/src/update/documents_deletion.rs | 6 +- meilidb-core/src/update/mod.rs | 15 +- meilidb-core/src/update/schema_update.rs | 6 +- meilidb-core/src/update/synonyms_addition.rs | 6 +- meilidb-core/src/update/synonyms_deletion.rs | 6 +- 25 files changed, 450 insertions(+), 684 deletions(-) diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index b73d1390a..0469996e2 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -12,20 +12,23 @@ crossbeam-channel = "0.3.9" deunicode = "1.0.0" env_logger = "0.7.0" hashbrown = { version = "0.6.0", features = ["serde"] } -lmdb-rkv = "0.12.3" log = "0.4.8" meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } once_cell = "1.2.0" ordered-float = { version = "1.0.2", features = ["serde"] } -rkv = "0.10.2" -sdset = "0.3.2" +sdset = "0.3.3" serde = { version = "1.0.101", features = ["derive"] } serde_json = "1.0.41" siphasher = "0.3.0" slice-group-by = "0.2.6" zerocopy = "0.2.8" +[dependencies.zlmdb] +package = "zerocopy-lmdb" +git = "https://github.com/Kerollmops/zerocopy-lmdb.git" +branch = "master" + [dependencies.levenshtein_automata] git = "https://github.com/Kerollmops/levenshtein-automata.git" branch = "arc-byte-slice" diff --git a/meilidb-core/examples/from_file.rs b/meilidb-core/examples/from_file.rs index 42113e32b..a5d8564f2 100644 --- a/meilidb-core/examples/from_file.rs +++ b/meilidb-core/examples/from_file.rs @@ -94,14 +94,14 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box { if current_schema != schema { @@ -150,7 +150,7 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box Result<(), Box> { - let rkv = database.rkv.read().unwrap(); + let env = &database.env; let index = database.open_index(INDEX_NAME).expect("Could not find index"); - let reader = rkv.read().unwrap(); + let reader = env.read_txn().unwrap(); let schema = index.main.schema(&reader)?; let schema = schema.ok_or(meilidb_core::Error::SchemaMissing)?; @@ -317,7 +317,7 @@ fn search_command(command: SearchCommand, database: Database) -> Result<(), Box< doc.highlights.sort_unstable_by_key(|m| (m.char_index, m.char_length)); let start_retrieve = Instant::now(); - let result = index.document::<_, Document>(&reader, Some(&fields), doc.id); + let result = index.document::(&reader, Some(&fields), doc.id); retrieve_duration += start_retrieve.elapsed(); match result { diff --git a/meilidb-core/src/automaton/mod.rs b/meilidb-core/src/automaton/mod.rs index f1d864a9a..4281a8c21 100644 --- a/meilidb-core/src/automaton/mod.rs +++ b/meilidb-core/src/automaton/mod.rs @@ -23,7 +23,7 @@ pub struct AutomatonProducer { impl AutomatonProducer { pub fn new( - reader: &impl rkv::Readable, + reader: &zlmdb::RoTxn, query: &str, main_store: store::Main, synonyms_store: store::Synonyms, @@ -108,7 +108,7 @@ pub fn normalize_str(string: &str) -> String { } fn generate_automatons( - reader: &impl rkv::Readable, + reader: &zlmdb::RoTxn, query: &str, main_store: store::Main, synonym_store: store::Synonyms, diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index af733d034..f5ae46347 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -3,6 +3,7 @@ use std::path::Path; use std::sync::{Arc, RwLock}; use std::{fs, thread}; +use zlmdb::types::{Str, Unit}; use crossbeam_channel::Receiver; use log::{debug, error}; @@ -12,27 +13,22 @@ pub type BoxUpdateFn = Box type ArcSwapFn = arc_swap::ArcSwapOption; pub struct Database { - pub rkv: Arc>, - common_store: rkv::SingleStore, - indexes_store: rkv::SingleStore, + pub env: zlmdb::Env, + common_store: zlmdb::DynDatabase, + indexes_store: zlmdb::Database, indexes: RwLock, thread::JoinHandle<()>)>>, } fn update_awaiter( receiver: Receiver<()>, - rkv: Arc>, + env: zlmdb::Env, update_fn: Arc, index: Index, ) { for () in receiver { // consume all updates in order (oldest first) loop { - let rkv = match rkv.read() { - Ok(rkv) => rkv, - Err(e) => { error!("rkv RwLock read failed: {}", e); break } - }; - - let mut writer = match rkv.write() { + let mut writer = match env.write_txn() { Ok(writer) => writer, Err(e) => { error!("LMDB writer transaction begin failed: {}", e); break } }; @@ -55,64 +51,57 @@ fn update_awaiter( impl Database { pub fn open_or_create(path: impl AsRef) -> MResult { - let manager = rkv::Manager::singleton(); - let mut rkv_write = manager.write().unwrap(); - fs::create_dir_all(path.as_ref())?; - let rkv = rkv_write - .get_or_create(path.as_ref(), |path| { - let mut builder = rkv::Rkv::environment_builder(); - builder.set_max_dbs(3000).set_map_size(10 * 1024 * 1024 * 1024); // 10GB - rkv::Rkv::from_env(path, builder) - })?; + let env = zlmdb::EnvOpenOptions::new() + .map_size(10 * 1024 * 1024 * 1024) // 10GB + .max_dbs(3000) + .open(path)?; - drop(rkv_write); - - let rkv_read = rkv.read().unwrap(); - let create_options = rkv::store::Options::create(); - let common_store = rkv_read.open_single("common", create_options)?; - let indexes_store = rkv_read.open_single("indexes", create_options)?; + let common_store = env.create_dyn_database(Some("common"))?; + let indexes_store = env.create_database::(Some("indexes"))?; // list all indexes that needs to be opened let mut must_open = Vec::new(); - let reader = rkv_read.read()?; - for result in indexes_store.iter_start(&reader)? { - let (key, _) = result?; - if let Ok(index_name) = std::str::from_utf8(key) { - must_open.push(index_name.to_owned()); - } + let reader = env.read_txn()?; + for result in indexes_store.iter(&reader)? { + let (index_name, _) = result?; + must_open.push(index_name.to_owned()); } - drop(reader); + reader.abort(); // open the previously aggregated indexes let mut indexes = HashMap::new(); for index_name in must_open { let (sender, receiver) = crossbeam_channel::bounded(100); - let index = store::open(&rkv_read, &index_name, sender.clone())?; + let index = match store::open(&env, &index_name, sender.clone())? { + Some(index) => index, + None => { + log::warn!("the index {} doesn't exist or has not all the databases", index_name); + continue; + }, + }; let update_fn = Arc::new(ArcSwapFn::empty()); - let rkv_clone = rkv.clone(); + let env_clone = env.clone(); let index_clone = index.clone(); let update_fn_clone = update_fn.clone(); let handle = thread::spawn(move || { - update_awaiter(receiver, rkv_clone, update_fn_clone, index_clone) + update_awaiter(receiver, env_clone, update_fn_clone, index_clone) }); // send an update notification to make sure that - // possible previous boot updates are consumed + // possible pre-boot updates are consumed sender.send(()).unwrap(); let result = indexes.insert(index_name, (index, update_fn, handle)); assert!(result.is_none(), "The index should not have been already open"); } - drop(rkv_read); - - Ok(Database { rkv, common_store, indexes_store, indexes: RwLock::new(indexes) }) + Ok(Database { env, common_store, indexes_store, indexes: RwLock::new(indexes) }) } pub fn open_index(&self, name: impl AsRef) -> Option { @@ -130,22 +119,20 @@ impl Database { match indexes_lock.entry(name.to_owned()) { Entry::Occupied(_) => Err(crate::Error::IndexAlreadyExists), Entry::Vacant(entry) => { - let rkv_lock = self.rkv.read().unwrap(); let (sender, receiver) = crossbeam_channel::bounded(100); - let index = store::create(&rkv_lock, name, sender)?; + let index = store::create(&self.env, name, sender)?; - let mut writer = rkv_lock.write()?; - let value = rkv::Value::Blob(&[]); - self.indexes_store.put(&mut writer, name, &value)?; + let mut writer = self.env.write_txn()?; + self.indexes_store.put(&mut writer, name, &())?; - let rkv_clone = self.rkv.clone(); + let env_clone = self.env.clone(); let index_clone = index.clone(); let no_update_fn = Arc::new(ArcSwapFn::empty()); let no_update_fn_clone = no_update_fn.clone(); let handle = thread::spawn(move || { - update_awaiter(receiver, rkv_clone, no_update_fn_clone, index_clone) + update_awaiter(receiver, env_clone, no_update_fn_clone, index_clone) }); writer.commit()?; @@ -181,7 +168,7 @@ impl Database { Ok(indexes.keys().cloned().collect()) } - pub fn common_store(&self) -> rkv::SingleStore { + pub fn common_store(&self) -> zlmdb::DynDatabase { self.common_store } } diff --git a/meilidb-core/src/error.rs b/meilidb-core/src/error.rs index 767337a80..3523b0ad5 100644 --- a/meilidb-core/src/error.rs +++ b/meilidb-core/src/error.rs @@ -12,7 +12,7 @@ pub enum Error { SchemaMissing, WordIndexMissing, MissingDocumentId, - Rkv(rkv::StoreError), + Zlmdb(zlmdb::Error), Fst(fst::Error), SerdeJson(SerdeJsonError), Bincode(bincode::Error), @@ -27,9 +27,9 @@ impl From for Error { } } -impl From for Error { - fn from(error: rkv::StoreError) -> Error { - Error::Rkv(error) +impl From for Error { + fn from(error: zlmdb::Error) -> Error { + Error::Zlmdb(error) } } @@ -79,7 +79,7 @@ impl fmt::Display for Error { SchemaMissing => write!(f, "this index does not have a schema"), WordIndexMissing => write!(f, "this index does not have a word index"), MissingDocumentId => write!(f, "document id is missing"), - Rkv(e) => write!(f, "rkv error; {}", e), + Zlmdb(e) => write!(f, "zlmdb error; {}", e), Fst(e) => write!(f, "fst error; {}", e), SerdeJson(e) => write!(f, "serde json error; {}", e), Bincode(e) => write!(f, "bincode error; {}", e), diff --git a/meilidb-core/src/query_builder.rs b/meilidb-core/src/query_builder.rs index ca9b43467..c00f67e62 100644 --- a/meilidb-core/src/query_builder.rs +++ b/meilidb-core/src/query_builder.rs @@ -125,7 +125,7 @@ fn multiword_rewrite_matches( } fn fetch_raw_documents( - reader: &impl rkv::Readable, + reader: &zlmdb::RoTxn, automatons: &[Automaton], query_enhancer: &QueryEnhancer, searchables: Option<&ReorderedAttrs>, @@ -278,7 +278,7 @@ impl<'c, FI> QueryBuilder<'c, FI> { impl QueryBuilder<'_, FI> where FI: Fn(DocumentId) -> bool { pub fn query( self, - reader: &impl rkv::Readable, + reader: &zlmdb::RoTxn, query: &str, range: Range, ) -> MResult> @@ -414,7 +414,7 @@ where FI: Fn(DocumentId) -> bool, { pub fn query( self, - reader: &impl rkv::Readable, + reader: &zlmdb::RoTxn, query: &str, range: Range, ) -> MResult> @@ -643,8 +643,8 @@ mod tests { } pub fn add_synonym(&mut self, word: &str, new: SetBuf<&str>) { - let rkv = self.database.rkv.read().unwrap(); - let mut writer = rkv.write().unwrap(); + let env = &self.database.env; + let mut writer = env.write_txn().unwrap(); let word = word.to_lowercase(); @@ -675,8 +675,8 @@ mod tests { let database = Database::open_or_create(&tempdir).unwrap(); let index = database.create_index("default").unwrap(); - let rkv = database.rkv.read().unwrap(); - let mut writer = rkv.write().unwrap(); + let env = &database.env; + let mut writer = env.write_txn().unwrap(); let mut words_fst = BTreeSet::new(); let mut postings_lists = HashMap::new(); @@ -720,7 +720,6 @@ mod tests { } writer.commit().unwrap(); - drop(rkv); TempDatabase { database, index, _tempdir: tempdir } } @@ -734,8 +733,8 @@ mod tests { ("apple", &[doc_char_index(0, 2, 2)][..]), ]); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "iphone from apple", 0..20).unwrap(); @@ -759,8 +758,8 @@ mod tests { store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "hello", 0..20).unwrap(); @@ -794,8 +793,8 @@ mod tests { store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"])); store.add_synonym("salut", SetBuf::from_dirty(vec!["hello"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "sal", 0..20).unwrap(); @@ -840,8 +839,8 @@ mod tests { store.add_synonym("salutation", SetBuf::from_dirty(vec!["hello"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "salutution", 0..20).unwrap(); @@ -878,8 +877,8 @@ mod tests { store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello", "salut"])); store.add_synonym("salut", SetBuf::from_dirty(vec!["hello", "bonjour"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "hello", 0..20).unwrap(); @@ -961,8 +960,8 @@ mod tests { store.add_synonym("NY", SetBuf::from_dirty(vec!["NYC", "new york", "new york city"])); store.add_synonym("NYC", SetBuf::from_dirty(vec!["NY", "new york", "new york city"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway", 0..20).unwrap(); @@ -1033,8 +1032,8 @@ mod tests { store.add_synonym("NY", SetBuf::from_dirty(vec!["york new"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY", 0..20).unwrap(); @@ -1092,8 +1091,8 @@ mod tests { store.add_synonym("new york", SetBuf::from_dirty(vec!["NY"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway", 0..20).unwrap(); @@ -1152,8 +1151,8 @@ mod tests { store.add_synonym("NY", SetBuf::from_dirty(vec!["NYC", "new york", "new york city"])); store.add_synonym("NYC", SetBuf::from_dirty(vec!["NY", "new york", "new york city"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway", 0..20).unwrap(); @@ -1228,8 +1227,8 @@ mod tests { store.add_synonym("NYC", SetBuf::from_dirty(vec!["NY", "new york", "new york city"])); store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway broken", 0..20).unwrap(); @@ -1311,8 +1310,8 @@ mod tests { store.add_synonym("new york city", SetBuf::from_dirty(vec![ "NYC", "NY", "new york" ])); store.add_synonym("underground train", SetBuf::from_dirty(vec![ "subway" ])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "new york underground train broken", 0..20).unwrap(); @@ -1407,8 +1406,8 @@ mod tests { store.add_synonym("new york", SetBuf::from_dirty(vec![ "new york city" ])); store.add_synonym("new york city", SetBuf::from_dirty(vec![ "new york" ])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "new york big ", 0..20).unwrap(); @@ -1446,8 +1445,8 @@ mod tests { store.add_synonym("NY", SetBuf::from_dirty(vec!["new york city story"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "NY subway ", 0..20).unwrap(); @@ -1496,8 +1495,8 @@ mod tests { store.add_synonym("new york city", SetBuf::from_dirty(vec!["NYC"])); store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "new york city long subway cool ", 0..20).unwrap(); @@ -1528,8 +1527,8 @@ mod tests { store.add_synonym("téléphone", SetBuf::from_dirty(vec!["iphone"])); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "telephone", 0..20).unwrap(); @@ -1590,8 +1589,8 @@ mod tests { ("case", &[doc_index(0, 1)][..]), ]); - let rkv = store.database.rkv.read().unwrap(); - let reader = rkv.read().unwrap(); + let env = &store.database.env; + let reader = env.read_txn().unwrap(); let builder = store.query_builder(); let results = builder.query(&reader, "i phone case", 0..20).unwrap(); diff --git a/meilidb-core/src/ranked_map.rs b/meilidb-core/src/ranked_map.rs index 0168883ff..d5bd15873 100644 --- a/meilidb-core/src/ranked_map.rs +++ b/meilidb-core/src/ranked_map.rs @@ -2,10 +2,13 @@ use std::io::{Read, Write}; use hashbrown::HashMap; use meilidb_schema::SchemaAttr; +use serde::{Serialize, Deserialize}; use crate::{DocumentId, Number}; #[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Serialize, Deserialize)] +#[serde(transparent)] pub struct RankedMap(HashMap<(DocumentId, SchemaAttr), Number>); impl RankedMap { diff --git a/meilidb-core/src/serde/deserializer.rs b/meilidb-core/src/serde/deserializer.rs index b9cd3a1d5..df5a8b502 100644 --- a/meilidb-core/src/serde/deserializer.rs +++ b/meilidb-core/src/serde/deserializer.rs @@ -14,7 +14,7 @@ use crate::DocumentId; #[derive(Debug)] pub enum DeserializerError { SerdeJson(SerdeJsonError), - Rkv(rkv::StoreError), + Zlmdb(zlmdb::Error), Custom(String), } @@ -28,7 +28,7 @@ impl fmt::Display for DeserializerError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { DeserializerError::SerdeJson(e) => write!(f, "serde json related error: {}", e), - DeserializerError::Rkv(e) => write!(f, "rkv related error: {}", e), + DeserializerError::Zlmdb(e) => write!(f, "zlmdb related error: {}", e), DeserializerError::Custom(s) => f.write_str(s), } } @@ -42,23 +42,21 @@ impl From for DeserializerError { } } -impl From for DeserializerError { - fn from(error: rkv::StoreError) -> DeserializerError { - DeserializerError::Rkv(error) +impl From for DeserializerError { + fn from(error: zlmdb::Error) -> DeserializerError { + DeserializerError::Zlmdb(error) } } -pub struct Deserializer<'a, R> { +pub struct Deserializer<'a> { pub document_id: DocumentId, - pub reader: &'a R, + pub reader: &'a zlmdb::RoTxn, pub documents_fields: DocumentsFields, pub schema: &'a Schema, pub attributes: Option<&'a HashSet>, } -impl<'de, 'a, 'b, R: 'a> de::Deserializer<'de> for &'b mut Deserializer<'a, R> -where R: rkv::Readable, -{ +impl<'de, 'a, 'b> de::Deserializer<'de> for &'b mut Deserializer<'a> { type Error = DeserializerError; fn deserialize_any(self, visitor: V) -> Result diff --git a/meilidb-core/src/serde/mod.rs b/meilidb-core/src/serde/mod.rs index 6846f14d2..dde014435 100644 --- a/meilidb-core/src/serde/mod.rs +++ b/meilidb-core/src/serde/mod.rs @@ -35,7 +35,7 @@ use crate::{DocumentId, ParseNumberError}; pub enum SerializerError { DocumentIdNotFound, InvalidDocumentIdType, - RkvError(rkv::StoreError), + Zlmdb(zlmdb::Error), SerdeJson(SerdeJsonError), ParseNumber(ParseNumberError), UnserializableType { type_name: &'static str }, @@ -59,7 +59,7 @@ impl fmt::Display for SerializerError { SerializerError::InvalidDocumentIdType => { f.write_str("document identifier can only be of type string or number") }, - SerializerError::RkvError(e) => write!(f, "rkv related error: {}", e), + SerializerError::Zlmdb(e) => write!(f, "zlmdb related error: {}", e), SerializerError::SerdeJson(e) => write!(f, "serde json error: {}", e), SerializerError::ParseNumber(e) => { write!(f, "error while trying to parse a number: {}", e) @@ -92,9 +92,9 @@ impl From for SerializerError { } } -impl From for SerializerError { - fn from(error: rkv::StoreError) -> SerializerError { - SerializerError::RkvError(error) +impl From for SerializerError { + fn from(error: zlmdb::Error) -> SerializerError { + SerializerError::Zlmdb(error) } } diff --git a/meilidb-core/src/store/docs_words.rs b/meilidb-core/src/store/docs_words.rs index 1254d032e..6cdf555d1 100644 --- a/meilidb-core/src/store/docs_words.rs +++ b/meilidb-core/src/store/docs_words.rs @@ -1,54 +1,51 @@ use std::sync::Arc; -use rkv::{Value, StoreError}; -use crate::{DocumentId, MResult}; +use zlmdb::types::{OwnedType, ByteSlice}; +use zlmdb::Result as ZResult; +use crate::DocumentId; +use super::BEU64; #[derive(Copy, Clone)] pub struct DocsWords { - pub(crate) docs_words: rkv::SingleStore, + pub(crate) docs_words: zlmdb::Database, ByteSlice>, } impl DocsWords { pub fn put_doc_words( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, document_id: DocumentId, words: &fst::Set, - ) -> Result<(), rkv::StoreError> + ) -> ZResult<()> { - let document_id_bytes = document_id.0.to_be_bytes(); + let document_id = BEU64::new(document_id.0); let bytes = words.as_fst().as_bytes(); - self.docs_words.put(writer, document_id_bytes, &Value::Blob(bytes)) + self.docs_words.put(writer, &document_id, bytes) } pub fn del_doc_words( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, document_id: DocumentId, - ) -> Result + ) -> ZResult { - let document_id_bytes = document_id.0.to_be_bytes(); - match self.docs_words.delete(writer, document_id_bytes) { - Ok(()) => Ok(true), - Err(StoreError::LmdbError(lmdb::Error::NotFound)) => Ok(false), - Err(e) => Err(e), - } + let document_id = BEU64::new(document_id.0); + self.docs_words.delete(writer, &document_id) } - pub fn doc_words( + pub fn doc_words( &self, - reader: &T, + reader: &zlmdb::RoTxn, document_id: DocumentId, - ) -> MResult> + ) -> ZResult> { - let document_id_bytes = document_id.0.to_be_bytes(); - match self.docs_words.get(reader, document_id_bytes)? { - Some(Value::Blob(bytes)) => { + let document_id = BEU64::new(document_id.0); + match self.docs_words.get(reader, &document_id)? { + Some(bytes) => { let len = bytes.len(); let bytes = Arc::from(bytes); - let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len)?; + let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap(); Ok(Some(fst::Set::from(fst))) }, - Some(value) => panic!("invalid type {:?}", value), None => Ok(None), } } diff --git a/meilidb-core/src/store/documents_fields.rs b/meilidb-core/src/store/documents_fields.rs index d3c28f990..7b528af97 100644 --- a/meilidb-core/src/store/documents_fields.rs +++ b/meilidb-core/src/store/documents_fields.rs @@ -1,102 +1,77 @@ -use std::convert::TryFrom; use meilidb_schema::SchemaAttr; +use zlmdb::types::{OwnedType, ByteSlice}; +use zlmdb::Result as ZResult; + use crate::DocumentId; -use super::{document_attribute_into_key, document_attribute_from_key}; +use super::DocumentAttrKey; #[derive(Copy, Clone)] pub struct DocumentsFields { - pub(crate) documents_fields: rkv::SingleStore, + pub(crate) documents_fields: zlmdb::Database, ByteSlice>, } impl DocumentsFields { pub fn put_document_field( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, document_id: DocumentId, attribute: SchemaAttr, value: &[u8], - ) -> Result<(), rkv::StoreError> + ) -> ZResult<()> { - let key = document_attribute_into_key(document_id, attribute); - self.documents_fields.put(writer, key, &rkv::Value::Blob(value)) + let key = DocumentAttrKey::new(document_id, attribute); + self.documents_fields.put(writer, &key, value) } pub fn del_all_document_fields( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, document_id: DocumentId, - ) -> Result + ) -> ZResult { - let document_id_bytes = document_id.0.to_be_bytes(); - let mut keys_to_delete = Vec::new(); - - // WARN we can not delete the keys using the iterator - // so we store them and delete them just after - let iter = self.documents_fields.iter_from(writer, document_id_bytes)?; - for result in iter { - let (key, _) = result?; - let array = TryFrom::try_from(key).unwrap(); - let (current_document_id, _) = document_attribute_from_key(array); - if current_document_id != document_id { break } - - keys_to_delete.push(key.to_owned()); - } - - let count = keys_to_delete.len(); - for key in keys_to_delete { - self.documents_fields.delete(writer, key)?; - } - - Ok(count) + let start = DocumentAttrKey::new(document_id, SchemaAttr::min()); + let end = DocumentAttrKey::new(document_id, SchemaAttr::max()); + self.documents_fields.delete_range(writer, start..=end) } - pub fn document_attribute<'a>( + pub fn document_attribute<'txn>( &self, - reader: &'a impl rkv::Readable, + reader: &'txn zlmdb::RoTxn, document_id: DocumentId, attribute: SchemaAttr, - ) -> Result, rkv::StoreError> + ) -> ZResult> { - let key = document_attribute_into_key(document_id, attribute); - - match self.documents_fields.get(reader, key)? { - Some(rkv::Value::Blob(bytes)) => Ok(Some(bytes)), - Some(value) => panic!("invalid type {:?}", value), - None => Ok(None), - } + let key = DocumentAttrKey::new(document_id, attribute); + self.documents_fields.get(reader, &key) } - pub fn document_fields<'r, T: rkv::Readable>( + pub fn document_fields<'txn>( &self, - reader: &'r T, + reader: &'txn zlmdb::RoTxn, document_id: DocumentId, - ) -> Result, rkv::StoreError> + ) -> ZResult> { - let document_id_bytes = document_id.0.to_be_bytes(); - let iter = self.documents_fields.iter_from(reader, document_id_bytes)?; - Ok(DocumentFieldsIter { document_id, iter }) + let start = DocumentAttrKey::new(document_id, SchemaAttr::min()); + let end = DocumentAttrKey::new(document_id, SchemaAttr::max()); + let iter = self.documents_fields.range(reader, start..=end)?; + Ok(DocumentFieldsIter { iter }) } } -pub struct DocumentFieldsIter<'r> { - document_id: DocumentId, - iter: rkv::store::single::Iter<'r>, +pub struct DocumentFieldsIter<'txn> { + iter: zlmdb::RoRange<'txn, OwnedType, ByteSlice>, } -impl<'r> Iterator for DocumentFieldsIter<'r> { - type Item = Result<(SchemaAttr, &'r [u8]), rkv::StoreError>; +impl<'txn> Iterator for DocumentFieldsIter<'txn> { + type Item = ZResult<(SchemaAttr, &'txn [u8])>; fn next(&mut self) -> Option { match self.iter.next() { - Some(Ok((key, Some(rkv::Value::Blob(bytes))))) => { - let array = TryFrom::try_from(key).unwrap(); - let (current_document_id, attr) = document_attribute_from_key(array); - if current_document_id != self.document_id { return None; } - + Some(Ok((key, bytes))) => { + let attr = SchemaAttr(key.attr.get()); Some(Ok((attr, bytes))) }, - Some(Ok((key, data))) => panic!("{:?}, {:?}", key, data), - Some(Err(e)) => Some(Err(e)), + Some(Err(e)) => Some(Err(e.into())), None => None, } } diff --git a/meilidb-core/src/store/documents_fields_counts.rs b/meilidb-core/src/store/documents_fields_counts.rs index b68821976..c075f703b 100644 --- a/meilidb-core/src/store/documents_fields_counts.rs +++ b/meilidb-core/src/store/documents_fields_counts.rs @@ -1,163 +1,142 @@ -use std::convert::TryFrom; use meilidb_schema::SchemaAttr; +use zlmdb::types::OwnedType; +use zlmdb::Result as ZResult; use crate::DocumentId; -use super::{document_attribute_into_key, document_attribute_from_key}; +use super::DocumentAttrKey; #[derive(Copy, Clone)] pub struct DocumentsFieldsCounts { - pub(crate) documents_fields_counts: rkv::SingleStore, + pub(crate) documents_fields_counts: zlmdb::Database, OwnedType>, } impl DocumentsFieldsCounts { pub fn put_document_field_count( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, document_id: DocumentId, attribute: SchemaAttr, value: u64, - ) -> Result<(), rkv::StoreError> + ) -> ZResult<()> { - let key = document_attribute_into_key(document_id, attribute); - self.documents_fields_counts.put(writer, key, &rkv::Value::U64(value)) + let key = DocumentAttrKey::new(document_id, attribute); + self.documents_fields_counts.put(writer, &key, &value) } pub fn del_all_document_fields_counts( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, document_id: DocumentId, - ) -> Result + ) -> ZResult { - let mut keys_to_delete = Vec::new(); - - // WARN we can not delete the keys using the iterator - // so we store them and delete them just after - for result in self.document_fields_counts(writer, document_id)? { - let (attribute, _) = result?; - let key = document_attribute_into_key(document_id, attribute); - keys_to_delete.push(key); - } - - let count = keys_to_delete.len(); - for key in keys_to_delete { - self.documents_fields_counts.delete(writer, key)?; - } - - Ok(count) + let start = DocumentAttrKey::new(document_id, SchemaAttr::min()); + let end = DocumentAttrKey::new(document_id, SchemaAttr::max()); + self.documents_fields_counts.delete_range(writer, start..=end) } pub fn document_field_count( &self, - reader: &impl rkv::Readable, + reader: &zlmdb::RoTxn, document_id: DocumentId, attribute: SchemaAttr, - ) -> Result, rkv::StoreError> + ) -> ZResult> { - let key = document_attribute_into_key(document_id, attribute); - - match self.documents_fields_counts.get(reader, key)? { - Some(rkv::Value::U64(count)) => Ok(Some(count)), - Some(value) => panic!("invalid type {:?}", value), + let key = DocumentAttrKey::new(document_id, attribute); + match self.documents_fields_counts.get(reader, &key)? { + Some(count) => Ok(Some(count)), None => Ok(None), } } - pub fn document_fields_counts<'r, T: rkv::Readable>( + pub fn document_fields_counts<'txn>( &self, - reader: &'r T, + reader: &'txn zlmdb::RoTxn, document_id: DocumentId, - ) -> Result, rkv::StoreError> + ) -> ZResult> { - let document_id_bytes = document_id.0.to_be_bytes(); - let iter = self.documents_fields_counts.iter_from(reader, document_id_bytes)?; - Ok(DocumentFieldsCountsIter { document_id, iter }) + let start = DocumentAttrKey::new(document_id, SchemaAttr::min()); + let end = DocumentAttrKey::new(document_id, SchemaAttr::max()); + let iter = self.documents_fields_counts.range(reader, start..=end)?; + Ok(DocumentFieldsCountsIter { iter }) } - pub fn documents_ids<'r, T: rkv::Readable>( + pub fn documents_ids<'txn>( &self, - reader: &'r T, - ) -> Result, rkv::StoreError> + reader: &'txn zlmdb::RoTxn, + ) -> ZResult> { - let iter = self.documents_fields_counts.iter_start(reader)?; + let iter = self.documents_fields_counts.iter(reader)?; Ok(DocumentsIdsIter { last_seen_id: None, iter }) } - pub fn all_documents_fields_counts<'r, T: rkv::Readable>( + pub fn all_documents_fields_counts<'txn>( &self, - reader: &'r T, - ) -> Result, rkv::StoreError> + reader: &'txn zlmdb::RoTxn, + ) -> ZResult> { - let iter = self.documents_fields_counts.iter_start(reader)?; + let iter = self.documents_fields_counts.iter(reader)?; Ok(AllDocumentsFieldsCountsIter { iter }) } } -pub struct DocumentFieldsCountsIter<'r> { - document_id: DocumentId, - iter: rkv::store::single::Iter<'r>, +pub struct DocumentFieldsCountsIter<'txn> { + iter: zlmdb::RoRange<'txn, OwnedType, OwnedType>, } impl Iterator for DocumentFieldsCountsIter<'_> { - type Item = Result<(SchemaAttr, u64), rkv::StoreError>; + type Item = ZResult<(SchemaAttr, u64)>; fn next(&mut self) -> Option { match self.iter.next() { - Some(Ok((key, Some(rkv::Value::U64(count))))) => { - let array = TryFrom::try_from(key).unwrap(); - let (current_document_id, attr) = document_attribute_from_key(array); - if current_document_id != self.document_id { return None; } - + Some(Ok((key, count))) => { + let attr = SchemaAttr(key.attr.get()); Some(Ok((attr, count))) }, - Some(Ok((key, data))) => panic!("{:?}, {:?}", key, data), - Some(Err(e)) => Some(Err(e)), + Some(Err(e)) => Some(Err(e.into())), None => None, } } } -pub struct DocumentsIdsIter<'r> { +pub struct DocumentsIdsIter<'txn> { last_seen_id: Option, - iter: rkv::store::single::Iter<'r>, + iter: zlmdb::RoIter<'txn, OwnedType, OwnedType>, } impl Iterator for DocumentsIdsIter<'_> { - type Item = Result; + type Item = ZResult; fn next(&mut self) -> Option { for result in &mut self.iter { match result { Ok((key, _)) => { - let array = TryFrom::try_from(key).unwrap(); - let (document_id, _) = document_attribute_from_key(array); + let document_id = DocumentId(key.docid.get()); if Some(document_id) != self.last_seen_id { self.last_seen_id = Some(document_id); return Some(Ok(document_id)) } }, - Err(e) => return Some(Err(e)), + Err(e) => return Some(Err(e.into())), } } - None } } -pub struct AllDocumentsFieldsCountsIter<'r> { - iter: rkv::store::single::Iter<'r>, +pub struct AllDocumentsFieldsCountsIter<'txn> { + iter: zlmdb::RoIter<'txn, OwnedType, OwnedType>, } impl<'r> Iterator for AllDocumentsFieldsCountsIter<'r> { - type Item = Result<(DocumentId, SchemaAttr, u64), rkv::StoreError>; + type Item = ZResult<(DocumentId, SchemaAttr, u64)>; fn next(&mut self) -> Option { match self.iter.next() { - Some(Ok((key, Some(rkv::Value::U64(count))))) => { - let array = TryFrom::try_from(key).unwrap(); - let (document_id, attr) = document_attribute_from_key(array); - Some(Ok((document_id, attr, count))) + Some(Ok((key, count))) => { + let docid = DocumentId(key.docid.get()); + let attr = SchemaAttr(key.attr.get()); + Some(Ok((docid, attr, count))) }, - Some(Ok((key, data))) => panic!("{:?}, {:?}", key, data), - Some(Err(e)) => Some(Err(e)), + Some(Err(e)) => Some(Err(e.into())), None => None, } } diff --git a/meilidb-core/src/store/main.rs b/meilidb-core/src/store/main.rs index 5beecdc5f..eb1921b07 100644 --- a/meilidb-core/src/store/main.rs +++ b/meilidb-core/src/store/main.rs @@ -1,9 +1,8 @@ use std::sync::Arc; -use std::convert::TryInto; - use meilidb_schema::Schema; -use rkv::Value; -use crate::{RankedMap, MResult}; +use zlmdb::types::{Str, OwnedType, ByteSlice, Serde}; +use zlmdb::Result as ZResult; +use crate::RankedMap; const CUSTOMS_KEY: &str = "customs-key"; const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents"; @@ -14,155 +13,80 @@ const WORDS_KEY: &str = "words"; #[derive(Copy, Clone)] pub struct Main { - pub(crate) main: rkv::SingleStore, + pub(crate) main: zlmdb::DynDatabase, } impl Main { - pub fn put_words_fst( - &self, - writer: &mut rkv::Writer, - fst: &fst::Set, - ) -> Result<(), rkv::StoreError> - { - let blob = rkv::Value::Blob(fst.as_fst().as_bytes()); - self.main.put(writer, WORDS_KEY, &blob) + pub fn put_words_fst(&self, writer: &mut zlmdb::RwTxn, fst: &fst::Set) -> ZResult<()> { + let bytes = fst.as_fst().as_bytes(); + self.main.put::(writer, WORDS_KEY, bytes) } - pub fn words_fst( - &self, - reader: &impl rkv::Readable, - ) -> MResult> - { - match self.main.get(reader, WORDS_KEY)? { - Some(Value::Blob(bytes)) => { + pub fn words_fst(&self, reader: &zlmdb::RoTxn) -> ZResult> { + match self.main.get::(reader, WORDS_KEY)? { + Some(bytes) => { let len = bytes.len(); let bytes = Arc::from(bytes); - let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len)?; + let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap(); Ok(Some(fst::Set::from(fst))) }, - Some(value) => panic!("invalid type {:?}", value), None => Ok(None), } } - pub fn put_schema( - &self, - writer: &mut rkv::Writer, - schema: &Schema, - ) -> MResult<()> - { - let bytes = bincode::serialize(schema)?; - let blob = Value::Blob(&bytes[..]); - self.main.put(writer, SCHEMA_KEY, &blob)?; - Ok(()) + pub fn put_schema(&self, writer: &mut zlmdb::RwTxn, schema: &Schema) -> ZResult<()> { + self.main.put::>(writer, SCHEMA_KEY, schema) } - pub fn schema( - &self, - reader: &impl rkv::Readable, - ) -> MResult> - { - match self.main.get(reader, SCHEMA_KEY)? { - Some(Value::Blob(bytes)) => { - let schema = bincode::deserialize_from(bytes)?; - Ok(Some(schema)) - }, - Some(value) => panic!("invalid type {:?}", value), - None => Ok(None), - } + pub fn schema(&self, reader: &zlmdb::RoTxn) -> ZResult> { + self.main.get::>(reader, SCHEMA_KEY) } - pub fn put_ranked_map( - &self, - writer: &mut rkv::Writer, - ranked_map: &RankedMap, - ) -> MResult<()> - { - let mut bytes = Vec::new(); - ranked_map.write_to_bin(&mut bytes)?; - let blob = Value::Blob(&bytes[..]); - self.main.put(writer, RANKED_MAP_KEY, &blob)?; - Ok(()) + pub fn put_ranked_map(&self, writer: &mut zlmdb::RwTxn, ranked_map: &RankedMap) -> ZResult<()> { + self.main.put::>(writer, RANKED_MAP_KEY, &ranked_map) } - pub fn ranked_map( - &self, - reader: &impl rkv::Readable, - ) -> MResult> - { - match self.main.get(reader, RANKED_MAP_KEY)? { - Some(Value::Blob(bytes)) => { - let ranked_map = RankedMap::read_from_bin(bytes)?; - Ok(Some(ranked_map)) - }, - Some(value) => panic!("invalid type {:?}", value), - None => Ok(None), - } + pub fn ranked_map(&self, reader: &zlmdb::RoTxn) -> ZResult> { + self.main.get::>(reader, RANKED_MAP_KEY) } - pub fn put_synonyms_fst( - &self, - writer: &mut rkv::Writer, - fst: &fst::Set, - ) -> MResult<()> - { - let blob = rkv::Value::Blob(fst.as_fst().as_bytes()); - Ok(self.main.put(writer, SYNONYMS_KEY, &blob)?) + pub fn put_synonyms_fst(&self, writer: &mut zlmdb::RwTxn, fst: &fst::Set) -> ZResult<()> { + let bytes = fst.as_fst().as_bytes(); + self.main.put::(writer, SYNONYMS_KEY, bytes) } - pub fn synonyms_fst( - &self, - reader: &impl rkv::Readable, - ) -> MResult> - { - match self.main.get(reader, SYNONYMS_KEY)? { - Some(Value::Blob(bytes)) => { + pub fn synonyms_fst(&self, reader: &zlmdb::RoTxn) -> ZResult> { + match self.main.get::(reader, SYNONYMS_KEY)? { + Some(bytes) => { let len = bytes.len(); let bytes = Arc::from(bytes); - let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len)?; + let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap(); Ok(Some(fst::Set::from(fst))) }, - Some(value) => panic!("invalid type {:?}", value), None => Ok(None), } } - pub fn put_number_of_documents u64>( - &self, - writer: &mut rkv::Writer, - f: F, - ) -> Result + pub fn put_number_of_documents(&self, writer: &mut zlmdb::RwTxn, f: F) -> ZResult + where F: Fn(u64) -> u64, { let new = self.number_of_documents(writer).map(f)?; - self.main.put(writer, NUMBER_OF_DOCUMENTS_KEY, &Value::Blob(&new.to_be_bytes()))?; + self.main.put::>(writer, NUMBER_OF_DOCUMENTS_KEY, &new)?; Ok(new) } - pub fn number_of_documents( - &self, - reader: &impl rkv::Readable, - ) -> Result - { - match self.main.get(reader, NUMBER_OF_DOCUMENTS_KEY)? { - Some(Value::Blob(bytes)) => { - let array = bytes.try_into().unwrap(); - Ok(u64::from_be_bytes(array)) - }, - Some(value) => panic!("invalid type {:?}", value), + pub fn number_of_documents(&self, reader: &zlmdb::RwTxn) -> ZResult { + match self.main.get::>(reader, NUMBER_OF_DOCUMENTS_KEY)? { + Some(value) => Ok(value), None => Ok(0), } } - pub fn put_customs(&self, writer: &mut rkv::Writer, customs: &[u8]) -> MResult<()> { - self.main.put(writer, CUSTOMS_KEY, &Value::Blob(customs))?; - Ok(()) + pub fn put_customs(&self, writer: &mut zlmdb::RwTxn, customs: &[u8]) -> ZResult<()> { + self.main.put::(writer, CUSTOMS_KEY, customs) } - pub fn customs<'t>(&self, reader: &'t impl rkv::Readable) -> MResult> { - match self.main.get(reader, CUSTOMS_KEY)? { - Some(Value::Blob(bytes)) => Ok(Some(bytes)), - Some(value) => panic!("invalid type {:?}", value), - None => Ok(None), - } + pub fn customs<'txn>(&self, reader: &'txn zlmdb::RoTxn) -> ZResult> { + self.main.get::(reader, CUSTOMS_KEY) } } diff --git a/meilidb-core/src/store/mod.rs b/meilidb-core/src/store/mod.rs index 6318284ca..d87671f32 100644 --- a/meilidb-core/src/store/mod.rs +++ b/meilidb-core/src/store/mod.rs @@ -17,42 +17,28 @@ pub use self::updates::Updates; pub use self::updates_results::UpdatesResults; use std::collections::HashSet; -use std::convert::TryFrom; use meilidb_schema::{Schema, SchemaAttr}; use serde::de; +use zerocopy::{AsBytes, FromBytes}; +use zlmdb::Result as ZResult; use crate::criterion::Criteria; use crate::serde::Deserializer; use crate::{update, query_builder::QueryBuilder, DocumentId, MResult, Error}; -fn aligned_to(bytes: &[u8], align: usize) -> bool { - (bytes as *const _ as *const () as usize) % align == 0 -} +type BEU64 = zerocopy::U64; +type BEU16 = zerocopy::U16; -fn document_attribute_into_key(document_id: DocumentId, attribute: SchemaAttr) -> [u8; 10] { - let document_id_bytes = document_id.0.to_be_bytes(); - let attr_bytes = attribute.0.to_be_bytes(); +#[derive(Debug, Copy, Clone)] +#[derive(AsBytes, FromBytes)] +#[repr(C)] +pub struct DocumentAttrKey { docid: BEU64, attr: BEU16 } - let mut key = [0u8; 10]; - key[0..8].copy_from_slice(&document_id_bytes); - key[8..10].copy_from_slice(&attr_bytes); - - key -} - -fn document_attribute_from_key(key: [u8; 10]) -> (DocumentId, SchemaAttr) { - let document_id = { - let array = TryFrom::try_from(&key[0..8]).unwrap(); - DocumentId(u64::from_be_bytes(array)) - }; - - let schema_attr = { - let array = TryFrom::try_from(&key[8..8+2]).unwrap(); - SchemaAttr(u16::from_be_bytes(array)) - }; - - (document_id, schema_attr) +impl DocumentAttrKey { + fn new(docid: DocumentId, attr: SchemaAttr) -> DocumentAttrKey { + DocumentAttrKey { docid: BEU64::new(docid.0), attr: BEU16::new(attr.0) } + } } fn main_name(name: &str) -> String { @@ -102,9 +88,9 @@ pub struct Index { } impl Index { - pub fn document( + pub fn document( &self, - reader: &R, + reader: &zlmdb::RoTxn, attributes: Option<&HashSet<&str>>, document_id: DocumentId, ) -> MResult> @@ -130,9 +116,9 @@ impl Index { Ok(T::deserialize(&mut deserializer).map(Some)?) } - pub fn document_attribute( + pub fn document_attribute( &self, - reader: &R, + reader: &zlmdb::RoTxn, document_id: DocumentId, attribute: SchemaAttr, ) -> MResult> @@ -144,12 +130,12 @@ impl Index { } } - pub fn schema_update(&self, writer: &mut rkv::Writer, schema: Schema) -> MResult { + pub fn schema_update(&self, writer: &mut zlmdb::RwTxn, schema: Schema) -> MResult { let _ = self.updates_notifier.send(()); update::push_schema_update(writer, self.updates, self.updates_results, schema) } - pub fn customs_update(&self, writer: &mut rkv::Writer, customs: Vec) -> MResult { + pub fn customs_update(&self, writer: &mut zlmdb::RwTxn, customs: Vec) -> ZResult { let _ = self.updates_notifier.send(()); update::push_customs_update(writer, self.updates, self.updates_results, customs) } @@ -186,16 +172,16 @@ impl Index { ) } - pub fn current_update_id(&self, reader: &T) -> MResult> { + pub fn current_update_id(&self, reader: &zlmdb::RoTxn) -> MResult> { match self.updates.last_update_id(reader)? { Some((id, _)) => Ok(Some(id)), None => Ok(None), } } - pub fn update_status( + pub fn update_status( &self, - reader: &T, + reader: &zlmdb::RoTxn, update_id: u64, ) -> MResult { @@ -228,31 +214,10 @@ impl Index { } pub fn create( - env: &rkv::Rkv, + env: &zlmdb::Env, name: &str, updates_notifier: crossbeam_channel::Sender<()>, -) -> Result -{ - open_options(env, name, rkv::StoreOptions::create(), updates_notifier) -} - -pub fn open( - env: &rkv::Rkv, - name: &str, - updates_notifier: crossbeam_channel::Sender<()>, -) -> Result -{ - let mut options = rkv::StoreOptions::default(); - options.create = false; - open_options(env, name, options, updates_notifier) -} - -fn open_options( - env: &rkv::Rkv, - name: &str, - options: rkv::StoreOptions, - updates_notifier: crossbeam_channel::Sender<()>, -) -> Result +) -> MResult { // create all the store names let main_name = main_name(name); @@ -265,14 +230,14 @@ fn open_options( let updates_results_name = updates_results_name(name); // open all the stores - let main = env.open_single(main_name.as_str(), options)?; - let postings_lists = env.open_single(postings_lists_name.as_str(), options)?; - let documents_fields = env.open_single(documents_fields_name.as_str(), options)?; - let documents_fields_counts = env.open_single(documents_fields_counts_name.as_str(), options)?; - let synonyms = env.open_single(synonyms_name.as_str(), options)?; - let docs_words = env.open_single(docs_words_name.as_str(), options)?; - let updates = env.open_single(updates_name.as_str(), options)?; - let updates_results = env.open_single(updates_results_name.as_str(), options)?; + let main = env.create_dyn_database(Some(&main_name))?; + let postings_lists = env.create_database(Some(&postings_lists_name))?; + let documents_fields = env.create_database(Some(&documents_fields_name))?; + let documents_fields_counts = env.create_database(Some(&documents_fields_counts_name))?; + let synonyms = env.create_database(Some(&synonyms_name))?; + let docs_words = env.create_database(Some(&docs_words_name))?; + let updates = env.create_database(Some(&updates_name))?; + let updates_results = env.create_database(Some(&updates_results_name))?; Ok(Index { main: Main { main }, @@ -286,3 +251,66 @@ fn open_options( updates_notifier, }) } + +pub fn open( + env: &zlmdb::Env, + name: &str, + updates_notifier: crossbeam_channel::Sender<()>, +) -> MResult> +{ + // create all the store names + let main_name = main_name(name); + let postings_lists_name = postings_lists_name(name); + let documents_fields_name = documents_fields_name(name); + let documents_fields_counts_name = documents_fields_counts_name(name); + let synonyms_name = synonyms_name(name); + let docs_words_name = docs_words_name(name); + let updates_name = updates_name(name); + let updates_results_name = updates_results_name(name); + + // open all the stores + let main = match env.open_dyn_database(Some(&main_name))? { + Some(main) => main, + None => return Ok(None), + }; + let postings_lists = match env.open_database(Some(&postings_lists_name))? { + Some(postings_lists) => postings_lists, + None => return Ok(None), + }; + let documents_fields = match env.open_database(Some(&documents_fields_name))? { + Some(documents_fields) => documents_fields, + None => return Ok(None), + }; + let documents_fields_counts = match env.open_database(Some(&documents_fields_counts_name))? { + Some(documents_fields_counts) => documents_fields_counts, + None => return Ok(None), + }; + let synonyms = match env.open_database(Some(&synonyms_name))? { + Some(synonyms) => synonyms, + None => return Ok(None), + }; + let docs_words = match env.open_database(Some(&docs_words_name))? { + Some(docs_words) => docs_words, + None => return Ok(None), + }; + let updates = match env.open_database(Some(&updates_name))? { + Some(updates) => updates, + None => return Ok(None), + }; + let updates_results = match env.open_database(Some(&updates_results_name))? { + Some(updates_results) => updates_results, + None => return Ok(None), + }; + + Ok(Some(Index { + main: Main { main }, + postings_lists: PostingsLists { postings_lists }, + documents_fields: DocumentsFields { documents_fields }, + documents_fields_counts: DocumentsFieldsCounts { documents_fields_counts }, + synonyms: Synonyms { synonyms }, + docs_words: DocsWords { docs_words }, + updates: Updates { updates }, + updates_results: UpdatesResults { updates_results }, + updates_notifier, + })) +} diff --git a/meilidb-core/src/store/postings_lists.rs b/meilidb-core/src/store/postings_lists.rs index 68eb81cbc..eb917df5b 100644 --- a/meilidb-core/src/store/postings_lists.rs +++ b/meilidb-core/src/store/postings_lists.rs @@ -1,81 +1,39 @@ use std::borrow::Cow; -use std::{mem, ptr}; - -use zerocopy::{AsBytes, LayoutVerified}; -use rkv::StoreError; - +use sdset::{Set, SetBuf}; +use zlmdb::types::{ByteSlice, CowSlice}; +use zlmdb::Result as ZResult; use crate::DocIndex; -use crate::store::aligned_to; #[derive(Copy, Clone)] pub struct PostingsLists { - pub(crate) postings_lists: rkv::SingleStore, + pub(crate) postings_lists: zlmdb::Database>, } impl PostingsLists { pub fn put_postings_list( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, word: &[u8], - words_indexes: &[DocIndex], - ) -> Result<(), rkv::StoreError> + words_indexes: &Set, + ) -> ZResult<()> { - let blob = rkv::Value::Blob(words_indexes.as_bytes()); - self.postings_lists.put(writer, word, &blob) + self.postings_lists.put(writer, word, words_indexes) } - pub fn del_postings_list( - &self, - writer: &mut rkv::Writer, - word: &[u8], - ) -> Result - { - match self.postings_lists.delete(writer, word) { - Ok(()) => Ok(true), - Err(StoreError::LmdbError(lmdb::Error::NotFound)) => Ok(false), - Err(e) => Err(e), - } + pub fn del_postings_list(&self, writer: &mut zlmdb::RwTxn, word: &[u8]) -> ZResult { + self.postings_lists.delete(writer, word) } - pub fn postings_list<'a>( + pub fn postings_list<'txn>( &self, - reader: &'a impl rkv::Readable, + reader: &'txn zlmdb::RoTxn, word: &[u8], - ) -> Result>>, rkv::StoreError> + ) -> ZResult>>> { - let bytes = match self.postings_lists.get(reader, word)? { - Some(rkv::Value::Blob(bytes)) => bytes, - Some(value) => panic!("invalid type {:?}", value), - None => return Ok(None), - }; - - match LayoutVerified::new_slice(bytes) { - Some(layout) => { - let set = sdset::Set::new(layout.into_slice()).unwrap(); - Ok(Some(Cow::Borrowed(set))) - }, - None => { - let len = bytes.len(); - let elem_size = mem::size_of::(); - - // ensure that it is the alignment that is wrong - // and the length is valid - if len % elem_size == 0 && !aligned_to(bytes, mem::align_of::()) { - let elems = len / elem_size; - let mut vec = Vec::::with_capacity(elems); - - unsafe { - let dst = vec.as_mut_ptr() as *mut u8; - ptr::copy_nonoverlapping(bytes.as_ptr(), dst, len); - vec.set_len(elems); - } - - let setbuf = sdset::SetBuf::new(vec).unwrap(); - return Ok(Some(Cow::Owned(setbuf))) - } - - Ok(None) - }, + match self.postings_lists.get(reader, word)? { + Some(Cow::Borrowed(slice)) => Ok(Some(Cow::Borrowed(Set::new_unchecked(slice)))), + Some(Cow::Owned(vec)) => Ok(Some(Cow::Owned(SetBuf::new_unchecked(vec)))), + None => Ok(None), } } } diff --git a/meilidb-core/src/store/synonyms.rs b/meilidb-core/src/store/synonyms.rs index c00f891ce..ca032c223 100644 --- a/meilidb-core/src/store/synonyms.rs +++ b/meilidb-core/src/store/synonyms.rs @@ -1,51 +1,36 @@ use std::sync::Arc; -use rkv::StoreError; -use crate::error::MResult; +use zlmdb::types::ByteSlice; +use zlmdb::Result as ZResult; #[derive(Copy, Clone)] pub struct Synonyms { - pub(crate) synonyms: rkv::SingleStore, + pub(crate) synonyms: zlmdb::Database, } impl Synonyms { pub fn put_synonyms( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, word: &[u8], synonyms: &fst::Set, - ) -> Result<(), rkv::StoreError> + ) -> ZResult<()> { - let blob = rkv::Value::Blob(synonyms.as_fst().as_bytes()); - self.synonyms.put(writer, word, &blob) + let bytes = synonyms.as_fst().as_bytes(); + self.synonyms.put(writer, word, bytes) } - pub fn del_synonyms( - &self, - writer: &mut rkv::Writer, - word: &[u8], - ) -> Result - { - match self.synonyms.delete(writer, word) { - Ok(()) => Ok(true), - Err(StoreError::LmdbError(lmdb::Error::NotFound)) => Ok(false), - Err(e) => Err(e), - } + pub fn del_synonyms(&self, writer: &mut zlmdb::RwTxn, word: &[u8]) -> ZResult { + self.synonyms.delete(writer, word) } - pub fn synonyms( - &self, - reader: &impl rkv::Readable, - word: &[u8], - ) -> MResult> - { + pub fn synonyms(&self, reader: &zlmdb::RoTxn, word: &[u8]) -> ZResult> { match self.synonyms.get(reader, word)? { - Some(rkv::Value::Blob(bytes)) => { + Some(bytes) => { let len = bytes.len(); let bytes = Arc::from(bytes); - let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len)?; + let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap(); Ok(Some(fst::Set::from(fst))) }, - Some(value) => panic!("invalid type {:?}", value), None => Ok(None), } } diff --git a/meilidb-core/src/store/updates.rs b/meilidb-core/src/store/updates.rs index 0f5ce2629..14c839adb 100644 --- a/meilidb-core/src/store/updates.rs +++ b/meilidb-core/src/store/updates.rs @@ -1,100 +1,56 @@ -use std::convert::TryInto; -use rkv::Value; -use crate::{update::Update, MResult}; +use zlmdb::types::{OwnedType, Serde}; +use zlmdb::Result as ZResult; +use crate::update::Update; +use super::BEU64; #[derive(Copy, Clone)] pub struct Updates { - pub(crate) updates: rkv::SingleStore, + pub(crate) updates: zlmdb::Database, Serde>, } impl Updates { - // TODO we should use the MDB_LAST op but - // it is not exposed by the rkv library - pub fn last_update_id<'a>( - &self, - reader: &'a impl rkv::Readable, - ) -> Result>)>, rkv::StoreError> - { - let mut last = None; - let iter = self.updates.iter_start(reader)?; - for result in iter { - let (key, data) = result?; - last = Some((key, data)); + // TODO do not trigger deserialize if possible + pub fn last_update_id(&self, reader: &zlmdb::RoTxn) -> ZResult> { + match self.updates.last(reader)? { + Some((key, data)) => Ok(Some((key.get(), data))), + None => Ok(None), } - - let (last_key, last_data) = match last { - Some(entry) => entry, - None => return Ok(None), - }; - - let array = last_key.try_into().unwrap(); - let number = u64::from_be_bytes(array); - - Ok(Some((number, last_data))) } - fn first_update_id<'a>( - &self, - reader: &'a impl rkv::Readable, - ) -> Result>)>, rkv::StoreError> - { - let mut iter = self.updates.iter_start(reader)?; - let (first_key, first_data) = match iter.next() { - Some(result) => result?, - None => return Ok(None), - }; - - let array = first_key.try_into().unwrap(); - let number = u64::from_be_bytes(array); - - Ok(Some((number, first_data))) + // TODO do not trigger deserialize if possible + fn first_update_id(&self, reader: &zlmdb::RoTxn) -> ZResult> { + match self.updates.first(reader)? { + Some((key, data)) => Ok(Some((key.get(), data))), + None => Ok(None), + } } - pub fn contains( - &self, - reader: &impl rkv::Readable, - update_id: u64, - ) -> Result - { - let update_id_bytes = update_id.to_be_bytes(); - self.updates.get(reader, update_id_bytes).map(|v| v.is_some()) + // TODO do not trigger deserialize if possible + pub fn contains(&self, reader: &zlmdb::RoTxn, update_id: u64) -> ZResult { + let update_id = BEU64::new(update_id); + self.updates.get(reader, &update_id).map(|v| v.is_some()) } pub fn put_update( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, update_id: u64, update: &Update, - ) -> MResult<()> + ) -> ZResult<()> { - let update_id_bytes = update_id.to_be_bytes(); - let update = serde_json::to_vec(&update)?; - let blob = Value::Blob(&update); - self.updates.put(writer, update_id_bytes, &blob)?; - Ok(()) + // TODO prefer using serde_json? + let update_id = BEU64::new(update_id); + self.updates.put(writer, &update_id, update) } - pub fn pop_front( - &self, - writer: &mut rkv::Writer, - ) -> MResult> - { - let (first_id, first_data) = match self.first_update_id(writer)? { - Some(entry) => entry, - None => return Ok(None), - }; - - match first_data { - Some(Value::Blob(bytes)) => { - let update = serde_json::from_slice(&bytes)?; - // remove it from the database now - let first_id_bytes = first_id.to_be_bytes(); - self.updates.delete(writer, first_id_bytes)?; - - Ok(Some((first_id, update))) + pub fn pop_front(&self, writer: &mut zlmdb::RwTxn) -> ZResult> { + match self.first_update_id(writer)? { + Some((update_id, update)) => { + let key = BEU64::new(update_id); + self.updates.delete(writer, &key)?; + Ok(Some((update_id, update))) }, - Some(value) => panic!("invalid type {:?}", value), - None => Ok(None), + None => Ok(None) } } } diff --git a/meilidb-core/src/store/updates_results.rs b/meilidb-core/src/store/updates_results.rs index 9b0c2d435..8deeb2f5b 100644 --- a/meilidb-core/src/store/updates_results.rs +++ b/meilidb-core/src/store/updates_results.rs @@ -1,67 +1,39 @@ -use std::convert::TryInto; -use rkv::Value; -use crate::{update::UpdateResult, MResult}; +use zlmdb::types::{OwnedType, Serde}; +use zlmdb::Result as ZResult; +use crate::update::UpdateResult; +use super::BEU64; #[derive(Copy, Clone)] pub struct UpdatesResults { - pub(crate) updates_results: rkv::SingleStore, + pub(crate) updates_results: zlmdb::Database, Serde>, } impl UpdatesResults { - // TODO we should use the MDB_LAST op but - // it is not exposed by the rkv library - pub fn last_update_id<'a>( - &self, - reader: &'a impl rkv::Readable, - ) -> Result>)>, rkv::StoreError> - { - let mut last = None; - let iter = self.updates_results.iter_start(reader)?; - for result in iter { - let (key, data) = result?; - last = Some((key, data)); + pub fn last_update_id(&self, reader: &zlmdb::RoTxn) -> ZResult> { + match self.updates_results.last(reader)? { + Some((key, data)) => Ok(Some((key.get(), data))), + None => Ok(None), } - - let (last_key, last_data) = match last { - Some(entry) => entry, - None => return Ok(None), - }; - - let array = last_key.try_into().unwrap(); - let number = u64::from_be_bytes(array); - - Ok(Some((number, last_data))) } pub fn put_update_result( &self, - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, update_id: u64, update_result: &UpdateResult, - ) -> MResult<()> + ) -> ZResult<()> { - let update_id_bytes = update_id.to_be_bytes(); - let update_result = bincode::serialize(&update_result)?; - let blob = Value::Blob(&update_result); - self.updates_results.put(writer, update_id_bytes, &blob)?; - Ok(()) + let update_id = BEU64::new(update_id); + self.updates_results.put(writer, &update_id, update_result) } pub fn update_result( &self, - reader: &impl rkv::Readable, + reader: &zlmdb::RoTxn, update_id: u64, - ) -> MResult> + ) -> ZResult> { - let update_id_bytes = update_id.to_be_bytes(); - - match self.updates_results.get(reader, update_id_bytes)? { - Some(Value::Blob(bytes)) => { - let update_result = bincode::deserialize(&bytes)?; - Ok(Some(update_result)) - }, - Some(value) => panic!("invalid type {:?}", value), - None => Ok(None), - } + let update_id = BEU64::new(update_id); + self.updates_results.get(reader, &update_id) } } diff --git a/meilidb-core/src/update/customs_update.rs b/meilidb-core/src/update/customs_update.rs index 5334b6659..2303b8689 100644 --- a/meilidb-core/src/update/customs_update.rs +++ b/meilidb-core/src/update/customs_update.rs @@ -1,21 +1,22 @@ +use zlmdb::Result as ZResult; use crate::update::{Update, next_update_id}; -use crate::{store, MResult}; +use crate::store; pub fn apply_customs_update( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, main_store: store::Main, customs: &[u8], -) -> MResult<()> +) -> ZResult<()> { main_store.put_customs(writer, customs) } pub fn push_customs_update( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, customs: Vec, -) -> MResult +) -> ZResult { let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index cb662617c..e57812f0d 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -36,7 +36,7 @@ impl DocumentsAddition { self.documents.push(document); } - pub fn finalize(self, writer: &mut rkv::Writer) -> MResult + pub fn finalize(self, writer: &mut zlmdb::RwTxn) -> MResult where D: serde::Serialize { let _ = self.updates_notifier.send(()); @@ -57,7 +57,7 @@ impl Extend for DocumentsAddition { } pub fn push_documents_addition( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, addition: Vec, @@ -79,7 +79,7 @@ pub fn push_documents_addition( } pub fn apply_documents_addition( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, diff --git a/meilidb-core/src/update/documents_deletion.rs b/meilidb-core/src/update/documents_deletion.rs index b56cebeb6..72c03f741 100644 --- a/meilidb-core/src/update/documents_deletion.rs +++ b/meilidb-core/src/update/documents_deletion.rs @@ -49,7 +49,7 @@ impl DocumentsDeletion { Ok(()) } - pub fn finalize(self, writer: &mut rkv::Writer) -> MResult { + pub fn finalize(self, writer: &mut zlmdb::RwTxn) -> MResult { let _ = self.updates_notifier.send(()); let update_id = push_documents_deletion( writer, @@ -68,7 +68,7 @@ impl Extend for DocumentsDeletion { } pub fn push_documents_deletion( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, deletion: Vec, @@ -83,7 +83,7 @@ pub fn push_documents_deletion( } pub fn apply_documents_deletion( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, main_store: store::Main, documents_fields_store: store::DocumentsFields, documents_fields_counts_store: store::DocumentsFieldsCounts, diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 7d4e349c0..bf3ef831f 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -18,11 +18,12 @@ use std::cmp; use log::debug; use serde::{Serialize, Deserialize}; +use zlmdb::Result as ZResult; use crate::{store, MResult, DocumentId, RankedMap}; use meilidb_schema::Schema; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum Update { Schema(Schema), Customs(Vec), @@ -62,8 +63,8 @@ pub enum UpdateStatus { Unknown, } -pub fn update_status( - reader: &T, +pub fn update_status( + reader: &zlmdb::RoTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, update_id: u64, @@ -82,10 +83,10 @@ pub fn update_status( } pub fn next_update_id( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, -) -> MResult +) -> ZResult { let last_update_id = updates_store.last_update_id(writer)?; let last_update_id = last_update_id.map(|(n, _)| n); @@ -99,7 +100,7 @@ pub fn next_update_id( Ok(new_update_id) } -pub fn update_task(writer: &mut rkv::Writer, index: store::Index) -> MResult> { +pub fn update_task(writer: &mut zlmdb::RwTxn, index: store::Index) -> MResult> { let (update_id, update) = match index.updates.pop_front(writer)? { Some(value) => value, None => return Ok(None), @@ -120,7 +121,7 @@ pub fn update_task(writer: &mut rkv::Writer, index: store::Index) -> MResult MResult<()> @@ -12,11 +12,11 @@ pub fn apply_schema_update( return Err(UnsupportedOperation::SchemaAlreadyExists.into()) } - main_store.put_schema(writer, new_schema) + main_store.put_schema(writer, new_schema).map_err(Into::into) } pub fn push_schema_update( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, schema: Schema, diff --git a/meilidb-core/src/update/synonyms_addition.rs b/meilidb-core/src/update/synonyms_addition.rs index ad2f03f23..d6219eac6 100644 --- a/meilidb-core/src/update/synonyms_addition.rs +++ b/meilidb-core/src/update/synonyms_addition.rs @@ -39,7 +39,7 @@ impl SynonymsAddition { self.synonyms.entry(synonym).or_insert_with(Vec::new).extend(alternatives); } - pub fn finalize(self, writer: &mut rkv::Writer) -> MResult { + pub fn finalize(self, writer: &mut zlmdb::RwTxn) -> MResult { let _ = self.updates_notifier.send(()); let update_id = push_synonyms_addition( writer, @@ -52,7 +52,7 @@ impl SynonymsAddition { } pub fn push_synonyms_addition( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, addition: BTreeMap>, @@ -67,7 +67,7 @@ pub fn push_synonyms_addition( } pub fn apply_synonyms_addition( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, main_store: store::Main, synonyms_store: store::Synonyms, addition: BTreeMap>, diff --git a/meilidb-core/src/update/synonyms_deletion.rs b/meilidb-core/src/update/synonyms_deletion.rs index fe8baefc6..43fc848ee 100644 --- a/meilidb-core/src/update/synonyms_deletion.rs +++ b/meilidb-core/src/update/synonyms_deletion.rs @@ -49,7 +49,7 @@ impl SynonymsDeletion { } } - pub fn finalize(self, writer: &mut rkv::Writer) -> MResult { + pub fn finalize(self, writer: &mut zlmdb::RwTxn) -> MResult { let _ = self.updates_notifier.send(()); let update_id = push_synonyms_deletion( writer, @@ -62,7 +62,7 @@ impl SynonymsDeletion { } pub fn push_synonyms_deletion( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, updates_store: store::Updates, updates_results_store: store::UpdatesResults, deletion: BTreeMap>>, @@ -77,7 +77,7 @@ pub fn push_synonyms_deletion( } pub fn apply_synonyms_deletion( - writer: &mut rkv::Writer, + writer: &mut zlmdb::RwTxn, main_store: store::Main, synonyms_store: store::Synonyms, deletion: BTreeMap>>,