diff --git a/Cargo.lock b/Cargo.lock index 6ae7d16ea..ebdee36fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -435,9 +435,9 @@ checksum = "4358a9e11b9a09cf52383b451b49a169e8d797b68aa02301ff586d70d9661ea3" [[package]] name = "either" -version = "1.5.3" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" [[package]] name = "fake-simd" @@ -630,7 +630,7 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "grenad" version = "0.1.0" -source = "git+https://github.com/Kerollmops/grenad.git?rev=ce3517f#ce3517fdbcf7ff0e1e703a4abbc623f69f29d8e0" +source = "git+https://github.com/Kerollmops/grenad.git?rev=3eb7ad9#3eb7ad9fff06c7b4d3286a3e37e40eea12d695de" dependencies = [ "byteorder", "flate2", @@ -1796,6 +1796,7 @@ version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95" dependencies = [ + "indexmap", "itoa", "ryu", "serde", diff --git a/Cargo.toml b/Cargo.toml index 27f4d3a2f..ef0a05e22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ csv = "1.1.3" flate2 = "1.0.17" fst = "0.4.4" fxhash = "0.2.1" -grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "ce3517f" } +grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3eb7ad9" } heed = { version = "0.10.0", default-features = false, features = ["lmdb"] } human_format = "1.0.3" indexmap = { version = "1.6.0", features = ["serde-1"] } @@ -27,7 +27,7 @@ once_cell = "1.4.0" rayon = "1.3.1" ringtail = "0.3.0" roaring = "0.6.1" -serde_json = "1.0.59" +serde_json = { version = "1.0.59", features = ["preserve_order"] } slice-group-by = "0.2.6" smallstr = { version = "0.2.0", features = ["serde"] } smallvec = "1.4.0" diff --git a/src/lib.rs b/src/lib.rs index 9ac492143..edeee2563 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,10 @@ pub mod subcommand; pub mod tokenizer; pub mod update; +use std::borrow::Cow; use std::collections::HashMap; use std::hash::BuildHasherDefault; + use fxhash::{FxHasher32, FxHasher64}; pub use self::criterion::{Criterion, default_criteria}; @@ -34,3 +36,5 @@ pub type BEU64 = heed::zerocopy::U64; pub type DocumentId = u32; pub type Attribute = u32; pub type Position = u32; + +type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result>; diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index bec82b01e..9869f0edc 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -18,6 +18,7 @@ use self::merge_function::{ }; pub use self::transform::{Transform, TransformOutput}; +use crate::MergeFn; use super::UpdateBuilder; mod merge_function; @@ -30,8 +31,6 @@ enum WriteMethod { GetMergePut, } -type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result>; - fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { let mut builder = Writer::builder(); builder.compression_type(typ); @@ -170,6 +169,7 @@ fn write_into_lmdb_database( } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[non_exhaustive] pub enum IndexDocumentsMethod { /// Replace the previous document with the new one, /// removing all the already known attributes. @@ -180,6 +180,15 @@ pub enum IndexDocumentsMethod { UpdateDocuments, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[non_exhaustive] +pub enum UpdateFormat { + /// The given update is a real **comma seperated** CSV with headers on the first line. + Csv, + /// The given update is a JSON array with documents inside. + Json, +} + pub struct IndexDocuments<'t, 'u, 'i> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, @@ -192,6 +201,7 @@ pub struct IndexDocuments<'t, 'u, 'i> { chunk_fusing_shrink_size: Option, indexing_jobs: Option, update_method: IndexDocumentsMethod, + update_format: UpdateFormat, } impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { @@ -207,7 +217,8 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { chunk_compression_level: None, chunk_fusing_shrink_size: None, indexing_jobs: None, - update_method: IndexDocumentsMethod::ReplaceDocuments + update_method: IndexDocumentsMethod::ReplaceDocuments, + update_format: UpdateFormat::Json, } } @@ -256,6 +267,11 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { self } + pub fn update_format(&mut self, format: UpdateFormat) -> &mut Self { + self.update_format = format; + self + } + pub fn execute(self, reader: R, progress_callback: F) -> anyhow::Result<()> where R: io::Read, @@ -274,6 +290,11 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { index_documents_method: self.update_method, }; + let output = match self.update_format { + UpdateFormat::Csv => transform.from_csv(reader)?, + UpdateFormat::Json => transform.from_json(reader)?, + }; + let TransformOutput { primary_key, fields_ids_map, @@ -282,7 +303,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { replaced_documents_ids, documents_count, documents_file, - } = transform.from_csv(reader)?; + } = output; // We delete the documents that this document addition replaces. This way we are // able to simply insert all the documents even if they already exist in the database. @@ -302,10 +323,17 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { let _deleted_documents_count = deletion_builder.execute()?; } - let mmap = unsafe { - memmap::Mmap::map(&documents_file).context("mmaping the transform documents file")? + let mmap = if documents_count == 0 { + None + } else { + let mmap = unsafe { + memmap::Mmap::map(&documents_file).context("mmaping the transform documents file")? + }; + Some(mmap) }; - let documents = grenad::Reader::new(mmap.as_ref())?; + + let bytes = mmap.as_ref().map(AsRef::as_ref).unwrap_or_default(); + let documents = grenad::Reader::new(bytes).unwrap(); // The enum which indicates the type of the readers // merges that are potentially done on different threads. @@ -499,13 +527,14 @@ mod tests { let path = tempfile::tempdir().unwrap(); let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB - let index = Index::new(options, &path).unwrap(); // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; - IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -517,7 +546,9 @@ mod tests { // Second we send 1 document with id 1, to erase the previous ones. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,updated kevin\n"[..]; - IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -529,7 +560,9 @@ mod tests { // Third we send 3 documents again to replace the existing ones. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,updated second kevin\n2,updated kevina\n3,updated benoit\n"[..]; - IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -544,7 +577,6 @@ mod tests { let path = tempfile::tempdir().unwrap(); let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB - let index = Index::new(options, &path).unwrap(); // First we send 3 documents with duplicate ids and @@ -552,6 +584,7 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..]; let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); @@ -578,6 +611,7 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); let content = &b"id,age\n1,25\n"[..]; let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); @@ -607,13 +641,14 @@ mod tests { let path = tempfile::tempdir().unwrap(); let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB - let index = Index::new(options, &path).unwrap(); // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name\nkevin\nkevina\nbenoit\n"[..]; - IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -629,7 +664,9 @@ mod tests { // Second we send 1 document with the generated uuid, to erase the previous ones. let mut wtxn = index.write_txn().unwrap(); let content = format!("id,name\n{},updated kevin", kevin_uuid); - IndexDocuments::new(&mut wtxn, &index).execute(content.as_bytes(), |_, _| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); + builder.execute(content.as_bytes(), |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -644,13 +681,14 @@ mod tests { let path = tempfile::tempdir().unwrap(); let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB - let index = Index::new(options, &path).unwrap(); // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; - IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -662,7 +700,9 @@ mod tests { // Second we send 1 document without specifying the id. let mut wtxn = index.write_txn().unwrap(); let content = &b"name\nnew kevin"[..]; - IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 4 documents now. @@ -671,4 +711,74 @@ mod tests { assert_eq!(count, 4); drop(rtxn); } + + #[test] + fn empty_csv_update() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + let index = Index::new(options, &path).unwrap(); + + // First we send 0 documents and only headers. + let mut wtxn = index.write_txn().unwrap(); + let content = &b"id,name\n"[..]; + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); + builder.execute(content, |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is no documents. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 0); + drop(rtxn); + } + + #[test] + fn json_documents() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + let index = Index::new(options, &path).unwrap(); + + // First we send 3 documents with an id for only one of them. + let mut wtxn = index.write_txn().unwrap(); + let content = &br#"[ + { "name": "kevin" }, + { "name": "kevina", "id": "21" }, + { "name": "benoit" } + ]"#[..]; + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Json); + builder.execute(content, |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is 3 documents now. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 3); + drop(rtxn); + } + + #[test] + fn empty_json_update() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + let index = Index::new(options, &path).unwrap(); + + // First we send 0 documents. + let mut wtxn = index.write_txn().unwrap(); + let content = &b"[]"[..]; + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Json); + builder.execute(content, |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is no documents. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 0); + drop(rtxn); + } } diff --git a/src/update/index_documents/transform.rs b/src/update/index_documents/transform.rs index 33f18a48a..b68d37b42 100644 --- a/src/update/index_documents/transform.rs +++ b/src/update/index_documents/transform.rs @@ -7,8 +7,9 @@ use anyhow::{anyhow, Context}; use fst::{IntoStreamer, Streamer}; use grenad::CompressionType; use roaring::RoaringBitmap; +use serde_json::{Map, Value}; -use crate::{BEU32, Index, FieldsIdsMap}; +use crate::{BEU32, MergeFn, Index, FieldsIdsMap}; use crate::update::AvailableDocumentsIds; use super::merge_function::merge_two_obkvs; use super::{create_writer, create_sorter, IndexDocumentsMethod}; @@ -35,6 +36,114 @@ pub struct Transform<'t, 'i> { } impl Transform<'_, '_> { + /// Extract the users ids, deduplicate and compute the new internal documents ids + /// and fields ids, writing all the documents under their internal ids into a final file. + /// + /// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids, + /// the replaced documents ids, the number of documents in this update and the file + /// containing all those documents. + pub fn from_json(self, reader: R) -> anyhow::Result { + let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; + let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); + let primary_key = self.index.primary_key(self.rtxn)?; + + // Deserialize the whole batch of documents in memory. + let documents: Vec> = serde_json::from_reader(reader)?; + + // We extract the primary key from the first document in + // the batch if it hasn't already been defined in the index. + let primary_key = match primary_key { + Some(primary_key) => primary_key, + None => { + match documents.get(0).and_then(|doc| doc.keys().find(|k| k.contains("id"))) { + Some(key) => fields_ids_map.insert(&key).context("field id limit reached")?, + None => fields_ids_map.insert("id").context("field id limit reached")?, + } + }, + }; + + if documents.is_empty() { + return Ok(TransformOutput { + primary_key, + fields_ids_map, + users_ids_documents_ids: fst::Map::default(), + new_documents_ids: RoaringBitmap::new(), + replaced_documents_ids: RoaringBitmap::new(), + documents_count: 0, + documents_file: tempfile::tempfile()?, + }); + } + + // Get the primary key field name now, this way we will + // be able to get the value in the JSON Map document. + let primary_key_name = fields_ids_map + .name(primary_key) + .expect("found the primary key name") + .to_owned(); + + // We must choose the appropriate merge function for when two or more documents + // with the same user id must be merged or fully replaced in the same batch. + let merge_function = match self.index_documents_method { + IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv, + IndexDocumentsMethod::UpdateDocuments => merge_obkvs, + }; + + // We initialize the sorter with the user indexing settings. + let mut sorter = create_sorter( + merge_function, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.max_nb_chunks, + self.max_memory, + ); + + let mut json_buffer = Vec::new(); + let mut obkv_buffer = Vec::new(); + let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; + + for mut document in documents { + obkv_buffer.clear(); + let mut writer = obkv::KvWriter::new(&mut obkv_buffer); + + // We prepare the fields ids map with the documents keys. + for (key, _value) in &document { + fields_ids_map.insert(&key).context("field id limit reached")?; + } + + // We iterate in the fields ids ordered. + for (field_id, name) in fields_ids_map.iter() { + if let Some(value) = document.get(name) { + // We serialize the attribute values. + json_buffer.clear(); + serde_json::to_writer(&mut json_buffer, value)?; + writer.insert(field_id, &json_buffer)?; + } + } + + // We retrieve the user id from the document based on the primary key name, + // if the document id isn't present we generate a uuid. + let user_id = match document.remove(&primary_key_name) { + Some(value) => match value { + Value::String(string) => Cow::Owned(string), + Value::Number(number) => Cow::Owned(number.to_string()), + _ => return Err(anyhow!("documents ids must be either strings or numbers")), + }, + None => { + let uuid = uuid::Uuid::new_v4().to_hyphenated().encode_lower(&mut uuid_buffer); + Cow::Borrowed(uuid) + }, + }; + + // We use the extracted/generated user id as the key for this document. + sorter.insert(user_id.as_bytes(), &obkv_buffer)?; + } + + // Now that we have a valid sorter that contains the user id and the obkv we + // give it to the last transforming function which returns the TransformOutput. + self.from_sorter(sorter, primary_key, fields_ids_map, users_ids_documents_ids) + } + /// Extract the users ids, deduplicate and compute the new internal documents ids /// and fields ids, writing all the documents under their internal ids into a final file. /// @@ -43,8 +152,6 @@ impl Transform<'_, '_> { /// containing all those documents. pub fn from_csv(self, reader: R) -> anyhow::Result { let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; - let documents_ids = self.index.documents_ids(self.rtxn)?; - let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); let mut csv = csv::Reader::from_reader(reader); @@ -85,11 +192,6 @@ impl Transform<'_, '_> { // the records fields in the fields ids map order and correctly generate the obkv. fields_ids.sort_unstable_by_key(|(field_id, _)| *field_id); - /// Only the last value associated with an id is kept. - fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result> { - obkvs.last().context("no last value").map(|last| last.clone().into_owned()) - } - // We initialize the sorter with the user indexing settings. let mut sorter = create_sorter( keep_latest_obkv, @@ -105,9 +207,9 @@ impl Transform<'_, '_> { let mut json_buffer = Vec::new(); let mut obkv_buffer = Vec::new(); let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; + let mut record = csv::StringRecord::new(); while csv.read_record(&mut record)? { - obkv_buffer.clear(); let mut writer = obkv::KvWriter::new(&mut obkv_buffer); @@ -138,6 +240,25 @@ impl Transform<'_, '_> { sorter.insert(user_id, &obkv_buffer)?; } + // Now that we have a valid sorter that contains the user id and the obkv we + // give it to the last transforming function which returns the TransformOutput. + self.from_sorter(sorter, primary_key_field_id, fields_ids_map, users_ids_documents_ids) + } + + /// Generate the TransformOutput based on the given sorter that can be generated from any + /// format like CSV, JSON or JSON lines. This sorter must contain a key that is the document + /// id for the user side and the value must be an obkv where keys are valid fields ids. + fn from_sorter( + self, + sorter: grenad::Sorter, + primary_key: u8, + fields_ids_map: FieldsIdsMap, + users_ids_documents_ids: fst::Map>, + ) -> anyhow::Result + { + let documents_ids = self.index.documents_ids(self.rtxn)?; + let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); + // Once we have sort and deduplicated the documents we write them into a final file. let mut final_sorter = create_sorter( |_docid, _obkvs| Err(anyhow!("cannot merge two documents")), @@ -150,6 +271,7 @@ impl Transform<'_, '_> { let mut new_users_ids_documents_ids_builder = fst::MapBuilder::memory(); let mut replaced_documents_ids = RoaringBitmap::new(); let mut new_documents_ids = RoaringBitmap::new(); + let mut obkv_buffer = Vec::new(); // While we write into final file we get or generate the internal documents ids. let mut documents_count = 0; @@ -219,7 +341,7 @@ impl Transform<'_, '_> { } Ok(TransformOutput { - primary_key: primary_key_field_id, + primary_key, fields_ids_map, users_ids_documents_ids: users_ids_documents_ids_builder.into_map(), new_documents_ids, @@ -229,3 +351,21 @@ impl Transform<'_, '_> { }) } } + +/// Only the last value associated with an id is kept. +fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result> { + obkvs.last().context("no last value").map(|last| last.clone().into_owned()) +} + +/// Merge all the obks in the order we see them. +fn merge_obkvs(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result> { + let mut iter = obkvs.iter(); + let first = iter.next().map(|b| b.clone().into_owned()).context("no first value")?; + Ok(iter.fold(first, |acc, current| { + let first = obkv::KvReader::new(&acc); + let second = obkv::KvReader::new(current); + let mut buffer = Vec::new(); + merge_two_obkvs(first, second, &mut buffer); + buffer + })) +}