It works perfectly with some Rhai

This commit is contained in:
Clément Renault 2024-05-09 13:07:32 +02:00
parent efc156a4a4
commit 2d97164d9f
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 729 additions and 949 deletions

1561
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -83,7 +83,7 @@ rand = "0.8.5"
tracing = "0.1.40"
ureq = { version = "2.10.0", features = ["json"] }
url = "2.5.2"
piccolo = "0.3.1"
rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax"] }
[dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false }

View File

@ -16,6 +16,7 @@ use grenad::{Merger, MergerBuilder};
use heed::types::Str;
use heed::Database;
use rand::SeedableRng;
use rhai::{Engine, Scope};
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use slice_group_by::GroupBy;
@ -32,7 +33,7 @@ pub use self::helpers::{
};
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap;
@ -40,7 +41,7 @@ use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
};
use crate::vector::EmbeddingConfigs;
use crate::{fields_ids_map, CboRoaringBitmapCodec, Index, Result};
use crate::{CboRoaringBitmapCodec, FieldsIdsMap, Index, Object, Result};
static MERGED_DATABASE_COUNT: usize = 7;
static PREFIX_DATABASE_COUNT: usize = 4;
@ -174,7 +175,7 @@ where
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
pub fn edit_documents(
mut self,
self,
documents: &RoaringBitmap,
code: &str,
) -> Result<(Self, StdResult<u64, UserError>)> {
@ -183,49 +184,75 @@ where
return Ok((self, Ok(0)));
}
let mut lua = piccolo::Lua::core();
let executor = lua.enter(|ctx| ctx.stash(piccolo::Executor::new(ctx)));
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
for docid in documents {
let document = match self.index.documents.get(self.wtxn, &docid)? {
Some(document) => document,
None => panic!("a document should always exists"),
};
lua.try_enter(|ctx| {
let closure = match piccolo::Closure::load(
ctx,
None,
("return ".to_string() + code).as_bytes(),
) {
Ok(closure) => closure,
Err(_) => piccolo::Closure::load(ctx, None, code.as_bytes())?,
};
let function = piccolo::Function::Closure(closure);
let table = piccolo::Table::new(&ctx);
table.set(ctx, "internal-id", docid)?;
table.set(ctx, "title", "hello")?;
table.set(ctx, "description", "world")?;
dbg!(&table);
ctx.set_global("doc", table)?;
ctx.fetch(&executor).restart(ctx, function, ());
Ok(())
/// Transform every field of a raw obkv store into a JSON Object.
pub fn all_obkv_to_rhaimap(
obkv: obkv::KvReaderU16,
fields_ids_map: &FieldsIdsMap,
) -> Result<rhai::Map> {
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
all_keys
.iter()
.copied()
.flat_map(|id| obkv.get(id).map(|value| (id, value)))
.map(|(id, value)| {
let name = fields_ids_map.name(id).ok_or(
crate::error::FieldIdMapMissingEntry::FieldId {
field_id: id,
process: "allobkv_to_rhaimap",
},
)?;
let value = serde_json::from_slice(value)
.map_err(crate::error::InternalError::SerdeJson)?;
Ok((name.into(), value))
})
.unwrap();
lua.execute::<()>(&executor).unwrap();
lua.try_enter(|ctx| {
let value = ctx.get_global("doc");
dbg!(value);
Ok(())
})
.unwrap();
.collect()
}
Ok((self, Ok(documents.len())))
fn rhaimap_to_object(map: rhai::Map) -> Object {
let mut output = Object::new();
for (key, value) in map {
let value = serde_json::to_value(&value).unwrap();
output.insert(key.into(), value);
}
output
}
let engine = Engine::new();
let ast = engine.compile(code).unwrap();
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
let primary_key = self.index.primary_key(self.wtxn)?.unwrap();
let primary_key_id = fields_ids_map.id(primary_key).unwrap();
let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?;
for docid in documents {
let (document, document_id) = match self.index.documents.get(self.wtxn, &docid)? {
Some(obkv) => {
let document_id_bytes = obkv.get(primary_key_id).unwrap();
let document_id: serde_json::Value =
serde_json::from_slice(document_id_bytes).unwrap();
let document = all_obkv_to_rhaimap(obkv, &fields_ids_map)?;
(document, document_id)
}
None => panic!("documents must exist"),
};
let mut scope = Scope::new();
scope.push("doc", document);
let new_document = engine.eval_ast_with_scope::<rhai::Map>(&mut scope, &ast).unwrap();
let new_document = rhaimap_to_object(new_document);
assert_eq!(
document_id, new_document[primary_key],
"you cannot change the document id when editing documents"
);
documents_batch_builder.append_json_object(&new_document)?;
}
let file = documents_batch_builder.into_inner()?;
let reader = DocumentsBatchReader::from_reader(file)?;
self.add_documents(reader)
}
pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self {