From b67d385cf0da8d3910d192aa33b610dda7603ee7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 12 May 2024 13:04:01 +0200 Subject: [PATCH] Parallelize the edition functions --- milli/Cargo.toml | 3 +- milli/src/update/index_documents/mod.rs | 122 ++++++++++--------- milli/src/update/index_documents/parallel.rs | 86 +++++++++++++ 3 files changed, 151 insertions(+), 60 deletions(-) create mode 100644 milli/src/update/index_documents/parallel.rs diff --git a/milli/Cargo.toml b/milli/Cargo.toml index eff5d4e3f..d0513706f 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -79,12 +79,11 @@ hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls", tiktoken-rs = "0.5.9" liquid = "0.26.6" arroy = "0.4.0" +rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax", "no_time", "sync"] } rand = "0.8.5" tracing = "0.1.40" ureq = { version = "2.10.0", features = ["json"] } url = "2.5.2" -rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax"] } -rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax", "no_time"] } [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index d3cf2fd12..3ed0d2db9 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -1,6 +1,7 @@ mod enrich; mod extract; mod helpers; +mod parallel; mod transform; mod typed_chunk; @@ -16,6 +17,7 @@ use grenad::{Merger, MergerBuilder}; use heed::types::Str; use heed::Database; use rand::SeedableRng; +use rayon::iter::{ParallelBridge, ParallelIterator}; use rhai::{Dynamic, Engine, OptimizationLevel, Scope}; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; @@ -37,11 +39,12 @@ use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchRead use crate::error::{Error, InternalError, UserError}; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; +use crate::update::index_documents::parallel::ImmutableObkvs; use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, }; use crate::vector::EmbeddingConfigs; -use crate::{all_obkv_to_json, CboRoaringBitmapCodec, FieldsIdsMap, Index, Object, Result}; +use crate::{CboRoaringBitmapCodec, FieldsIdsMap, Index, Object, Result}; static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 4; @@ -185,30 +188,6 @@ where return Ok((self, Ok((0, 0)))); } - /// Transform every field of a raw obkv store into a Rhai Map. - pub fn all_obkv_to_rhaimap( - obkv: obkv::KvReaderU16, - fields_ids_map: &FieldsIdsMap, - ) -> Result { - let all_keys = obkv.iter().map(|(k, _v)| k).collect::>(); - all_keys - .iter() - .copied() - .flat_map(|id| obkv.get(id).map(|value| (id, value))) - .map(|(id, value)| { - let name = fields_ids_map.name(id).ok_or( - crate::error::FieldIdMapMissingEntry::FieldId { - field_id: id, - process: "allobkv_to_rhaimap", - }, - )?; - let value = serde_json::from_slice(value) - .map_err(crate::error::InternalError::SerdeJson)?; - Ok((name.into(), value)) - }) - .collect() - } - fn rhaimap_to_object(map: rhai::Map) -> Object { let mut output = Object::new(); for (key, value) in map { @@ -220,13 +199,12 @@ where let mut engine = Engine::new(); engine.set_optimization_level(OptimizationLevel::Full); - //It is an arbitrary value. We need to let users define this in the settings. + // It is an arbitrary value. We need to let users define this in the settings. engine.set_max_operations(1_000_000); let ast = engine.compile(code).unwrap(); let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; let primary_key = self.index.primary_key(self.wtxn)?.unwrap(); - let primary_key_id = fields_ids_map.id(primary_key).unwrap(); let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?; let mut documents_to_remove = RoaringBitmap::new(); @@ -235,51 +213,79 @@ where None => Dynamic::from(()), }; - for docid in documents { - if (self.should_abort)() { - return Err(Error::InternalError(InternalError::AbortedIndexation)); - } + enum DocumentEdition { + Deleted(crate::DocumentId), + Edited(Object), + Nothing, + } - let (document, document_object, document_id) = - match self.index.documents.get(self.wtxn, &docid)? { - Some(obkv) => { - let document_id_bytes = obkv.get(primary_key_id).unwrap(); - let document_id: serde_json::Value = - serde_json::from_slice(document_id_bytes).unwrap(); - let document = all_obkv_to_rhaimap(obkv, &fields_ids_map)?; - let document_object = all_obkv_to_json(obkv, &fields_ids_map)?; - (document, document_object, document_id) - } - None => panic!("documents must exist"), - }; + let immutable_obkvs = ImmutableObkvs::new( + self.wtxn, + self.index.documents, + fields_ids_map.clone(), + documents.clone(), + )?; + + let processing = documents.into_iter().par_bridge().map(|docid| { + let rhai_document = immutable_obkvs.rhai_map(docid)?.unwrap(); + let json_document = immutable_obkvs.json_map(docid)?.unwrap(); + let document_id = &json_document[primary_key]; let mut scope = Scope::new(); scope.push_constant_dynamic("context", context.clone()); - scope.push("doc", document); + scope.push("doc", rhai_document); let _ = engine.eval_ast_with_scope::(&mut scope, &ast).unwrap(); - let new_document = match scope.remove::("doc") { + + match scope.remove::("doc") { // If the "doc" variable has been removed from the scope // or set to (), we effectively delete the document. Some(doc) if doc.is_unit() => { - documents_to_remove.push(docid); - continue; + return Ok(DocumentEdition::Deleted(docid)); } None => unreachable!(), Some(document) => match document.try_cast() { - Some(document) => rhaimap_to_object(document), + Some(document) => { + let new_document = rhaimap_to_object(document); + if json_document != new_document { + assert_eq!( + Some(document_id), + new_document.get(primary_key), + "you cannot change the document id when editing documents" + ); + return Ok(DocumentEdition::Edited(new_document)); + } + } None => panic!("Why is \"doc\" no longer a Map?"), }, - }; - - if document_object != new_document { - assert_eq!( - Some(&document_id), - new_document.get(primary_key), - "you cannot change the document id when editing documents" - ); - documents_batch_builder.append_json_object(&new_document)?; } - } + + Ok(DocumentEdition::Nothing) as Result<_> + }); + + std::thread::scope(|s| { + let (send, recv) = std::sync::mpsc::sync_channel(100); + s.spawn(move || processing.for_each(|el| drop(send.send(el)))); + + for result in recv { + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + + match result? { + DocumentEdition::Deleted(docid) => { + documents_to_remove.push(docid); + } + DocumentEdition::Edited(new_document) => { + documents_batch_builder.append_json_object(&new_document)?; + } + DocumentEdition::Nothing => (), + } + } + + Ok(()) + })?; + + drop(immutable_obkvs); let file = documents_batch_builder.into_inner()?; let reader = DocumentsBatchReader::from_reader(file)?; diff --git a/milli/src/update/index_documents/parallel.rs b/milli/src/update/index_documents/parallel.rs new file mode 100644 index 000000000..9769e8ab5 --- /dev/null +++ b/milli/src/update/index_documents/parallel.rs @@ -0,0 +1,86 @@ +use heed::types::Bytes; +use heed::{Database, RoTxn}; +use obkv::KvReaderU16; +use roaring::RoaringBitmap; + +use crate::{all_obkv_to_json, DocumentId, FieldsIdsMap, Object, ObkvCodec, Result, BEU32}; + +pub struct ImmutableObkvs<'t> { + ids: RoaringBitmap, + fields_ids_map: FieldsIdsMap, + slices: Vec<&'t [u8]>, +} + +impl<'t> ImmutableObkvs<'t> { + /// Creates the structure by fetching all the OBKVs + /// and keeping the transaction making the pointers valid. + pub fn new( + rtxn: &'t RoTxn, + documents_database: Database, + fields_ids_map: FieldsIdsMap, + subset: RoaringBitmap, + ) -> heed::Result { + let mut slices = Vec::new(); + let documents_database = documents_database.remap_data_type::(); + for docid in &subset { + let slice = documents_database.get(rtxn, &docid)?.unwrap(); + slices.push(slice); + } + + Ok(ImmutableObkvs { ids: subset, fields_ids_map, slices }) + } + + /// Returns the OBKVs identified by the given ID. + pub fn obkv(&self, docid: DocumentId) -> heed::Result>> { + match self + .ids + .rank(docid) + .checked_sub(1) + .and_then(|offset| self.slices.get(offset as usize)) + { + Some(bytes) => Ok(Some(KvReaderU16::new(bytes))), + None => Ok(None), + } + } + + /// Returns the owned rhai::Map identified by the given ID. + pub fn rhai_map(&self, docid: DocumentId) -> Result> { + let obkv = match self.obkv(docid) { + Ok(Some(obkv)) => obkv, + Ok(None) => return Ok(None), + Err(e) => return Err(e.into()), + }; + + let all_keys = obkv.iter().map(|(k, _v)| k).collect::>(); + let map: Result = all_keys + .iter() + .copied() + .flat_map(|id| obkv.get(id).map(|value| (id, value))) + .map(|(id, value)| { + let name = self.fields_ids_map.name(id).ok_or( + crate::error::FieldIdMapMissingEntry::FieldId { + field_id: id, + process: "allobkv_to_rhaimap", + }, + )?; + let value = serde_json::from_slice(value) + .map_err(crate::error::InternalError::SerdeJson)?; + Ok((name.into(), value)) + }) + .collect(); + + map.map(Some) + } + + pub fn json_map(&self, docid: DocumentId) -> Result> { + let obkv = match self.obkv(docid) { + Ok(Some(obkv)) => obkv, + Ok(None) => return Ok(None), + Err(e) => return Err(e.into()), + }; + + all_obkv_to_json(obkv, &self.fields_ids_map).map(Some) + } +} + +unsafe impl Sync for ImmutableObkvs<'_> {}