mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 06:44:27 +01:00
Parallelize the edition functions
This commit is contained in:
parent
dfecb25814
commit
b67d385cf0
@ -79,12 +79,11 @@ hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls",
|
|||||||
tiktoken-rs = "0.5.9"
|
tiktoken-rs = "0.5.9"
|
||||||
liquid = "0.26.6"
|
liquid = "0.26.6"
|
||||||
arroy = "0.4.0"
|
arroy = "0.4.0"
|
||||||
|
rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax", "no_time", "sync"] }
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
tracing = "0.1.40"
|
tracing = "0.1.40"
|
||||||
ureq = { version = "2.10.0", features = ["json"] }
|
ureq = { version = "2.10.0", features = ["json"] }
|
||||||
url = "2.5.2"
|
url = "2.5.2"
|
||||||
rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax"] }
|
|
||||||
rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax", "no_time"] }
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
mimalloc = { version = "0.1.43", default-features = false }
|
mimalloc = { version = "0.1.43", default-features = false }
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
mod enrich;
|
mod enrich;
|
||||||
mod extract;
|
mod extract;
|
||||||
mod helpers;
|
mod helpers;
|
||||||
|
mod parallel;
|
||||||
mod transform;
|
mod transform;
|
||||||
mod typed_chunk;
|
mod typed_chunk;
|
||||||
|
|
||||||
@ -16,6 +17,7 @@ use grenad::{Merger, MergerBuilder};
|
|||||||
use heed::types::Str;
|
use heed::types::Str;
|
||||||
use heed::Database;
|
use heed::Database;
|
||||||
use rand::SeedableRng;
|
use rand::SeedableRng;
|
||||||
|
use rayon::iter::{ParallelBridge, ParallelIterator};
|
||||||
use rhai::{Dynamic, Engine, OptimizationLevel, Scope};
|
use rhai::{Dynamic, Engine, OptimizationLevel, Scope};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -37,11 +39,12 @@ use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchRead
|
|||||||
use crate::error::{Error, InternalError, UserError};
|
use crate::error::{Error, InternalError, UserError};
|
||||||
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
|
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
|
||||||
pub use crate::update::index_documents::helpers::CursorClonableMmap;
|
pub use crate::update::index_documents::helpers::CursorClonableMmap;
|
||||||
|
use crate::update::index_documents::parallel::ImmutableObkvs;
|
||||||
use crate::update::{
|
use crate::update::{
|
||||||
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
|
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
|
||||||
};
|
};
|
||||||
use crate::vector::EmbeddingConfigs;
|
use crate::vector::EmbeddingConfigs;
|
||||||
use crate::{all_obkv_to_json, CboRoaringBitmapCodec, FieldsIdsMap, Index, Object, Result};
|
use crate::{CboRoaringBitmapCodec, FieldsIdsMap, Index, Object, Result};
|
||||||
|
|
||||||
static MERGED_DATABASE_COUNT: usize = 7;
|
static MERGED_DATABASE_COUNT: usize = 7;
|
||||||
static PREFIX_DATABASE_COUNT: usize = 4;
|
static PREFIX_DATABASE_COUNT: usize = 4;
|
||||||
@ -185,30 +188,6 @@ where
|
|||||||
return Ok((self, Ok((0, 0))));
|
return Ok((self, Ok((0, 0))));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Transform every field of a raw obkv store into a Rhai Map.
|
|
||||||
pub fn all_obkv_to_rhaimap(
|
|
||||||
obkv: obkv::KvReaderU16,
|
|
||||||
fields_ids_map: &FieldsIdsMap,
|
|
||||||
) -> Result<rhai::Map> {
|
|
||||||
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
|
|
||||||
all_keys
|
|
||||||
.iter()
|
|
||||||
.copied()
|
|
||||||
.flat_map(|id| obkv.get(id).map(|value| (id, value)))
|
|
||||||
.map(|(id, value)| {
|
|
||||||
let name = fields_ids_map.name(id).ok_or(
|
|
||||||
crate::error::FieldIdMapMissingEntry::FieldId {
|
|
||||||
field_id: id,
|
|
||||||
process: "allobkv_to_rhaimap",
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
let value = serde_json::from_slice(value)
|
|
||||||
.map_err(crate::error::InternalError::SerdeJson)?;
|
|
||||||
Ok((name.into(), value))
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn rhaimap_to_object(map: rhai::Map) -> Object {
|
fn rhaimap_to_object(map: rhai::Map) -> Object {
|
||||||
let mut output = Object::new();
|
let mut output = Object::new();
|
||||||
for (key, value) in map {
|
for (key, value) in map {
|
||||||
@ -220,13 +199,12 @@ where
|
|||||||
|
|
||||||
let mut engine = Engine::new();
|
let mut engine = Engine::new();
|
||||||
engine.set_optimization_level(OptimizationLevel::Full);
|
engine.set_optimization_level(OptimizationLevel::Full);
|
||||||
//It is an arbitrary value. We need to let users define this in the settings.
|
// It is an arbitrary value. We need to let users define this in the settings.
|
||||||
engine.set_max_operations(1_000_000);
|
engine.set_max_operations(1_000_000);
|
||||||
|
|
||||||
let ast = engine.compile(code).unwrap();
|
let ast = engine.compile(code).unwrap();
|
||||||
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
|
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
|
||||||
let primary_key = self.index.primary_key(self.wtxn)?.unwrap();
|
let primary_key = self.index.primary_key(self.wtxn)?.unwrap();
|
||||||
let primary_key_id = fields_ids_map.id(primary_key).unwrap();
|
|
||||||
let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?;
|
let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?;
|
||||||
let mut documents_to_remove = RoaringBitmap::new();
|
let mut documents_to_remove = RoaringBitmap::new();
|
||||||
|
|
||||||
@ -235,51 +213,79 @@ where
|
|||||||
None => Dynamic::from(()),
|
None => Dynamic::from(()),
|
||||||
};
|
};
|
||||||
|
|
||||||
for docid in documents {
|
enum DocumentEdition {
|
||||||
|
Deleted(crate::DocumentId),
|
||||||
|
Edited(Object),
|
||||||
|
Nothing,
|
||||||
|
}
|
||||||
|
|
||||||
|
let immutable_obkvs = ImmutableObkvs::new(
|
||||||
|
self.wtxn,
|
||||||
|
self.index.documents,
|
||||||
|
fields_ids_map.clone(),
|
||||||
|
documents.clone(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let processing = documents.into_iter().par_bridge().map(|docid| {
|
||||||
|
let rhai_document = immutable_obkvs.rhai_map(docid)?.unwrap();
|
||||||
|
let json_document = immutable_obkvs.json_map(docid)?.unwrap();
|
||||||
|
let document_id = &json_document[primary_key];
|
||||||
|
|
||||||
|
let mut scope = Scope::new();
|
||||||
|
scope.push_constant_dynamic("context", context.clone());
|
||||||
|
scope.push("doc", rhai_document);
|
||||||
|
let _ = engine.eval_ast_with_scope::<Dynamic>(&mut scope, &ast).unwrap();
|
||||||
|
|
||||||
|
match scope.remove::<Dynamic>("doc") {
|
||||||
|
// If the "doc" variable has been removed from the scope
|
||||||
|
// or set to (), we effectively delete the document.
|
||||||
|
Some(doc) if doc.is_unit() => {
|
||||||
|
return Ok(DocumentEdition::Deleted(docid));
|
||||||
|
}
|
||||||
|
None => unreachable!(),
|
||||||
|
Some(document) => match document.try_cast() {
|
||||||
|
Some(document) => {
|
||||||
|
let new_document = rhaimap_to_object(document);
|
||||||
|
if json_document != new_document {
|
||||||
|
assert_eq!(
|
||||||
|
Some(document_id),
|
||||||
|
new_document.get(primary_key),
|
||||||
|
"you cannot change the document id when editing documents"
|
||||||
|
);
|
||||||
|
return Ok(DocumentEdition::Edited(new_document));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => panic!("Why is \"doc\" no longer a Map?"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(DocumentEdition::Nothing) as Result<_>
|
||||||
|
});
|
||||||
|
|
||||||
|
std::thread::scope(|s| {
|
||||||
|
let (send, recv) = std::sync::mpsc::sync_channel(100);
|
||||||
|
s.spawn(move || processing.for_each(|el| drop(send.send(el))));
|
||||||
|
|
||||||
|
for result in recv {
|
||||||
if (self.should_abort)() {
|
if (self.should_abort)() {
|
||||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
||||||
}
|
}
|
||||||
|
|
||||||
let (document, document_object, document_id) =
|
match result? {
|
||||||
match self.index.documents.get(self.wtxn, &docid)? {
|
DocumentEdition::Deleted(docid) => {
|
||||||
Some(obkv) => {
|
|
||||||
let document_id_bytes = obkv.get(primary_key_id).unwrap();
|
|
||||||
let document_id: serde_json::Value =
|
|
||||||
serde_json::from_slice(document_id_bytes).unwrap();
|
|
||||||
let document = all_obkv_to_rhaimap(obkv, &fields_ids_map)?;
|
|
||||||
let document_object = all_obkv_to_json(obkv, &fields_ids_map)?;
|
|
||||||
(document, document_object, document_id)
|
|
||||||
}
|
|
||||||
None => panic!("documents must exist"),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut scope = Scope::new();
|
|
||||||
scope.push_constant_dynamic("context", context.clone());
|
|
||||||
scope.push("doc", document);
|
|
||||||
let _ = engine.eval_ast_with_scope::<Dynamic>(&mut scope, &ast).unwrap();
|
|
||||||
let new_document = match scope.remove::<Dynamic>("doc") {
|
|
||||||
// If the "doc" variable has been removed from the scope
|
|
||||||
// or set to (), we effectively delete the document.
|
|
||||||
Some(doc) if doc.is_unit() => {
|
|
||||||
documents_to_remove.push(docid);
|
documents_to_remove.push(docid);
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
None => unreachable!(),
|
DocumentEdition::Edited(new_document) => {
|
||||||
Some(document) => match document.try_cast() {
|
|
||||||
Some(document) => rhaimap_to_object(document),
|
|
||||||
None => panic!("Why is \"doc\" no longer a Map?"),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
if document_object != new_document {
|
|
||||||
assert_eq!(
|
|
||||||
Some(&document_id),
|
|
||||||
new_document.get(primary_key),
|
|
||||||
"you cannot change the document id when editing documents"
|
|
||||||
);
|
|
||||||
documents_batch_builder.append_json_object(&new_document)?;
|
documents_batch_builder.append_json_object(&new_document)?;
|
||||||
}
|
}
|
||||||
|
DocumentEdition::Nothing => (),
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
drop(immutable_obkvs);
|
||||||
|
|
||||||
let file = documents_batch_builder.into_inner()?;
|
let file = documents_batch_builder.into_inner()?;
|
||||||
let reader = DocumentsBatchReader::from_reader(file)?;
|
let reader = DocumentsBatchReader::from_reader(file)?;
|
||||||
|
86
milli/src/update/index_documents/parallel.rs
Normal file
86
milli/src/update/index_documents/parallel.rs
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
use heed::types::Bytes;
|
||||||
|
use heed::{Database, RoTxn};
|
||||||
|
use obkv::KvReaderU16;
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
|
use crate::{all_obkv_to_json, DocumentId, FieldsIdsMap, Object, ObkvCodec, Result, BEU32};
|
||||||
|
|
||||||
|
pub struct ImmutableObkvs<'t> {
|
||||||
|
ids: RoaringBitmap,
|
||||||
|
fields_ids_map: FieldsIdsMap,
|
||||||
|
slices: Vec<&'t [u8]>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'t> ImmutableObkvs<'t> {
|
||||||
|
/// Creates the structure by fetching all the OBKVs
|
||||||
|
/// and keeping the transaction making the pointers valid.
|
||||||
|
pub fn new(
|
||||||
|
rtxn: &'t RoTxn,
|
||||||
|
documents_database: Database<BEU32, ObkvCodec>,
|
||||||
|
fields_ids_map: FieldsIdsMap,
|
||||||
|
subset: RoaringBitmap,
|
||||||
|
) -> heed::Result<Self> {
|
||||||
|
let mut slices = Vec::new();
|
||||||
|
let documents_database = documents_database.remap_data_type::<Bytes>();
|
||||||
|
for docid in &subset {
|
||||||
|
let slice = documents_database.get(rtxn, &docid)?.unwrap();
|
||||||
|
slices.push(slice);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ImmutableObkvs { ids: subset, fields_ids_map, slices })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the OBKVs identified by the given ID.
|
||||||
|
pub fn obkv(&self, docid: DocumentId) -> heed::Result<Option<KvReaderU16<'t>>> {
|
||||||
|
match self
|
||||||
|
.ids
|
||||||
|
.rank(docid)
|
||||||
|
.checked_sub(1)
|
||||||
|
.and_then(|offset| self.slices.get(offset as usize))
|
||||||
|
{
|
||||||
|
Some(bytes) => Ok(Some(KvReaderU16::new(bytes))),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the owned rhai::Map identified by the given ID.
|
||||||
|
pub fn rhai_map(&self, docid: DocumentId) -> Result<Option<rhai::Map>> {
|
||||||
|
let obkv = match self.obkv(docid) {
|
||||||
|
Ok(Some(obkv)) => obkv,
|
||||||
|
Ok(None) => return Ok(None),
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
|
||||||
|
let map: Result<rhai::Map> = all_keys
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.flat_map(|id| obkv.get(id).map(|value| (id, value)))
|
||||||
|
.map(|(id, value)| {
|
||||||
|
let name = self.fields_ids_map.name(id).ok_or(
|
||||||
|
crate::error::FieldIdMapMissingEntry::FieldId {
|
||||||
|
field_id: id,
|
||||||
|
process: "allobkv_to_rhaimap",
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
let value = serde_json::from_slice(value)
|
||||||
|
.map_err(crate::error::InternalError::SerdeJson)?;
|
||||||
|
Ok((name.into(), value))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
map.map(Some)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn json_map(&self, docid: DocumentId) -> Result<Option<Object>> {
|
||||||
|
let obkv = match self.obkv(docid) {
|
||||||
|
Ok(Some(obkv)) => obkv,
|
||||||
|
Ok(None) => return Ok(None),
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
all_obkv_to_json(obkv, &self.fields_ids_map).map(Some)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Sync for ImmutableObkvs<'_> {}
|
Loading…
Reference in New Issue
Block a user