Seems to work great, still need to read function from settings

This commit is contained in:
Clément Renault 2025-04-16 22:01:50 +02:00
parent 3ec5b9d488
commit 51acd7a381
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 105 additions and 6 deletions

View File

@ -406,6 +406,60 @@ impl<'doc> Versions<'doc> {
Ok(Some(Self::single(data)))
}
pub fn multiple_with_edits(
doc: Option<rhai::Map>,
mut versions: impl Iterator<Item = Result<RawMap<'doc, FxBuildHasher>>>,
engine: &rhai::Engine,
edit_function: &rhai::AST,
doc_alloc: &'doc bumpalo::Bump,
) -> Result<Option<Self>> {
let Some(data) = versions.next() else { return Ok(None) };
let mut doc = doc.unwrap_or_default();
let mut data = data?;
for version in versions {
let version = version?;
for (field, value) in version {
data.insert(field, value);
}
let mut scope = rhai::Scope::new();
data.iter().for_each(|(k, v)| {
doc.insert(k.into(), serde_json::from_str(v.get()).unwrap());
});
scope.push("doc", doc.clone());
let _ = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, edit_function).unwrap();
data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc);
for (key, value) in scope.get_value::<rhai::Map>("doc").unwrap() {
let mut vec = bumpalo::collections::Vec::new_in(doc_alloc);
serde_json::to_writer(&mut vec, &value).unwrap();
let key = doc_alloc.alloc_str(key.as_str());
let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap();
data.insert(key, raw_value);
}
}
// We must also run the code after the last change
let mut scope = rhai::Scope::new();
data.iter().for_each(|(k, v)| {
doc.insert(k.into(), serde_json::from_str(v.get()).unwrap());
});
scope.push("doc", doc);
let _ = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, edit_function).unwrap();
data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc);
for (key, value) in scope.get_value::<rhai::Map>("doc").unwrap() {
let mut vec = bumpalo::collections::Vec::new_in(doc_alloc);
serde_json::to_writer(&mut vec, &value).unwrap();
let key = doc_alloc.alloc_str(key.as_str());
let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap();
data.insert(key, raw_value);
}
Ok(Some(Self::single(data)))
}
pub fn single(version: RawMap<'doc, FxBuildHasher>) -> Self {
Self { data: version }
}

View File

@ -17,6 +17,7 @@ use super::guess_primary_key::retrieve_or_guess_primary_key;
use crate::documents::PrimaryKey;
use crate::progress::{AtomicPayloadStep, Progress};
use crate::update::new::document::Versions;
use crate::update::new::indexer::update_by_function::obkv_to_rhaimap;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, Insertion, Update};
@ -157,7 +158,23 @@ impl<'pl> DocumentOperation<'pl> {
.sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0));
let docids_version_offsets = docids_version_offsets.into_bump_slice();
Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key))
let engine = rhai::Engine::new();
let ast = Some(
r#"
let incr = doc.remove("incr_likes");
if incr != () {
doc.likes = (doc.likes ?? 0) + incr;
}
"#,
)
.map(|f| engine.compile(f).unwrap());
let fidmap = index.fields_ids_map(rtxn)?;
Ok((
DocumentOperationChanges { docids_version_offsets, engine, ast, fidmap },
operations_stats,
primary_key,
))
}
}
@ -418,7 +435,15 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
'pl: 'doc,
{
let (external_doc, payload_operations) = item;
payload_operations.merge(external_doc, &context.doc_alloc)
payload_operations.merge(
&context.rtxn,
context.index,
&self.fidmap,
&self.engine,
self.ast.as_ref(),
external_doc,
&context.doc_alloc,
)
}
fn len(&self) -> usize {
@ -427,6 +452,9 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
}
pub struct DocumentOperationChanges<'pl> {
engine: rhai::Engine,
ast: Option<rhai::AST>,
fidmap: FieldsIdsMap,
docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)],
}
@ -489,10 +517,13 @@ impl<'pl> PayloadOperations<'pl> {
}
/// Returns only the most recent version of a document based on the updates from the payloads.
///
/// This function is only meant to be used when doing a replacement and not an update.
fn merge<'doc>(
&self,
rtxn: &heed::RoTxn,
index: &Index,
fidmap: &FieldsIdsMap,
engine: &rhai::Engine,
ast: Option<&rhai::AST>,
external_doc: &'doc str,
doc_alloc: &'doc Bump,
) -> Result<Option<DocumentChange<'doc>>>
@ -556,7 +587,21 @@ impl<'pl> PayloadOperations<'pl> {
Ok(document)
});
let Some(versions) = Versions::multiple(versions)? else { return Ok(None) };
let versions = match ast {
Some(ast) => {
let doc = index
.documents
.get(rtxn, &self.docid)?
.map(|obkv| obkv_to_rhaimap(obkv, fidmap))
.transpose()?;
Versions::multiple_with_edits(doc, versions, engine, ast, doc_alloc)?
}
None => Versions::multiple(versions)?,
};
let Some(versions) = versions else {
return Ok(None);
};
if self.is_new {
Ok(Some(DocumentChange::Insertion(Insertion::create(

View File

@ -189,7 +189,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
}
}
fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {
pub fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
let map: Result<rhai::Map> = all_keys
.iter()