From 51acd7a3815cfda4222deff14e88b174fbf06af2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 16 Apr 2025 22:01:50 +0200 Subject: [PATCH] Seems to work great, still need to read function from settings --- crates/milli/src/update/new/document.rs | 54 ++++++++++++++++++ .../update/new/indexer/document_operation.rs | 55 +++++++++++++++++-- .../update/new/indexer/update_by_function.rs | 2 +- 3 files changed, 105 insertions(+), 6 deletions(-) diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 1ef44fc8d..e6178d1df 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -406,6 +406,60 @@ impl<'doc> Versions<'doc> { Ok(Some(Self::single(data))) } + pub fn multiple_with_edits( + doc: Option, + mut versions: impl Iterator>>, + engine: &rhai::Engine, + edit_function: &rhai::AST, + doc_alloc: &'doc bumpalo::Bump, + ) -> Result> { + let Some(data) = versions.next() else { return Ok(None) }; + + let mut doc = doc.unwrap_or_default(); + let mut data = data?; + for version in versions { + let version = version?; + for (field, value) in version { + data.insert(field, value); + } + + let mut scope = rhai::Scope::new(); + data.iter().for_each(|(k, v)| { + doc.insert(k.into(), serde_json::from_str(v.get()).unwrap()); + }); + scope.push("doc", doc.clone()); + + let _ = engine.eval_ast_with_scope::(&mut scope, edit_function).unwrap(); + data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc); + for (key, value) in scope.get_value::("doc").unwrap() { + let mut vec = bumpalo::collections::Vec::new_in(doc_alloc); + serde_json::to_writer(&mut vec, &value).unwrap(); + let key = doc_alloc.alloc_str(key.as_str()); + let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap(); + data.insert(key, raw_value); + } + } + + // We must also run the code after the last change + let mut scope = rhai::Scope::new(); + data.iter().for_each(|(k, v)| { + doc.insert(k.into(), serde_json::from_str(v.get()).unwrap()); + }); + scope.push("doc", doc); + + let _ = engine.eval_ast_with_scope::(&mut scope, edit_function).unwrap(); + data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc); + for (key, value) in scope.get_value::("doc").unwrap() { + let mut vec = bumpalo::collections::Vec::new_in(doc_alloc); + serde_json::to_writer(&mut vec, &value).unwrap(); + let key = doc_alloc.alloc_str(key.as_str()); + let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap(); + data.insert(key, raw_value); + } + + Ok(Some(Self::single(data))) + } + pub fn single(version: RawMap<'doc, FxBuildHasher>) -> Self { Self { data: version } } diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index ca433c043..766dcb32e 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -17,6 +17,7 @@ use super::guess_primary_key::retrieve_or_guess_primary_key; use crate::documents::PrimaryKey; use crate::progress::{AtomicPayloadStep, Progress}; use crate::update::new::document::Versions; +use crate::update::new::indexer::update_by_function::obkv_to_rhaimap; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; use crate::update::new::{Deletion, Insertion, Update}; @@ -157,7 +158,23 @@ impl<'pl> DocumentOperation<'pl> { .sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0)); let docids_version_offsets = docids_version_offsets.into_bump_slice(); - Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key)) + let engine = rhai::Engine::new(); + let ast = Some( + r#" + let incr = doc.remove("incr_likes"); + if incr != () { + doc.likes = (doc.likes ?? 0) + incr; + } + "#, + ) + .map(|f| engine.compile(f).unwrap()); + let fidmap = index.fields_ids_map(rtxn)?; + + Ok(( + DocumentOperationChanges { docids_version_offsets, engine, ast, fidmap }, + operations_stats, + primary_key, + )) } } @@ -418,7 +435,15 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { 'pl: 'doc, { let (external_doc, payload_operations) = item; - payload_operations.merge(external_doc, &context.doc_alloc) + payload_operations.merge( + &context.rtxn, + context.index, + &self.fidmap, + &self.engine, + self.ast.as_ref(), + external_doc, + &context.doc_alloc, + ) } fn len(&self) -> usize { @@ -427,6 +452,9 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { } pub struct DocumentOperationChanges<'pl> { + engine: rhai::Engine, + ast: Option, + fidmap: FieldsIdsMap, docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)], } @@ -489,10 +517,13 @@ impl<'pl> PayloadOperations<'pl> { } /// Returns only the most recent version of a document based on the updates from the payloads. - /// - /// This function is only meant to be used when doing a replacement and not an update. fn merge<'doc>( &self, + rtxn: &heed::RoTxn, + index: &Index, + fidmap: &FieldsIdsMap, + engine: &rhai::Engine, + ast: Option<&rhai::AST>, external_doc: &'doc str, doc_alloc: &'doc Bump, ) -> Result>> @@ -556,7 +587,21 @@ impl<'pl> PayloadOperations<'pl> { Ok(document) }); - let Some(versions) = Versions::multiple(versions)? else { return Ok(None) }; + let versions = match ast { + Some(ast) => { + let doc = index + .documents + .get(rtxn, &self.docid)? + .map(|obkv| obkv_to_rhaimap(obkv, fidmap)) + .transpose()?; + Versions::multiple_with_edits(doc, versions, engine, ast, doc_alloc)? + } + None => Versions::multiple(versions)?, + }; + + let Some(versions) = versions else { + return Ok(None); + }; if self.is_new { Ok(Some(DocumentChange::Insertion(Insertion::create( diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index 3001648e6..4f232c40b 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -189,7 +189,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { } } -fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result { +pub fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result { let all_keys = obkv.iter().map(|(k, _v)| k).collect::>(); let map: Result = all_keys .iter()