Executing Lua works correctly

This commit is contained in:
Clément Renault 2024-05-08 23:37:57 +02:00
parent ba85959642
commit efc156a4a4
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
7 changed files with 616 additions and 421 deletions

853
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1751,42 +1751,40 @@ fn delete_document_by_filter<'a>(
fn edit_documents_by_function<'a>( fn edit_documents_by_function<'a>(
wtxn: &mut RwTxn<'a>, wtxn: &mut RwTxn<'a>,
filter: &serde_json::Value, filter: &Option<serde_json::Value>,
code: &str, code: &str,
indexer_config: &IndexerConfig, indexer_config: &IndexerConfig,
must_stop_processing: MustStopProcessing, must_stop_processing: MustStopProcessing,
index: &'a Index, index: &'a Index,
) -> Result<u64> { ) -> Result<u64> {
let filter = Filter::from_json(filter)?; let candidates = match filter.as_ref().map(Filter::from_json) {
Ok(if let Some(filter) = filter { Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err {
let candidates = filter.evaluate(wtxn, index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
} }
e => e.into(), e => e.into(),
})?; })?,
None | Some(Ok(None)) => index.documents_ids(wtxn)?,
Some(Err(e)) => return Err(e.into()),
};
let config = IndexDocumentsConfig { let config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments, update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default() ..Default::default()
}; };
let mut builder = milli::update::IndexDocuments::new( let mut builder = milli::update::IndexDocuments::new(
wtxn, wtxn,
index, index,
indexer_config, indexer_config,
config, config,
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; )?;
todo!("edit documents with the code and reinsert them in the builder") let (new_builder, count) = builder.edit_documents(&candidates, code)?;
// let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?; builder = new_builder;
// builder = new_builder;
// let _ = builder.execute()?; let _ = builder.execute()?;
// count Ok(count.unwrap())
} else {
0
})
} }

View File

@ -93,7 +93,7 @@ impl From<Details> for DetailsView {
Details::DocumentEdition { edited_documents, original_filter, edition_code } => { Details::DocumentEdition { edited_documents, original_filter, edition_code } => {
DetailsView { DetailsView {
edited_documents: Some(edited_documents), edited_documents: Some(edited_documents),
original_filter: Some(Some(original_filter)), original_filter: Some(original_filter),
edition_code: Some(edition_code), edition_code: Some(edition_code),
..DetailsView::default() ..DetailsView::default()
} }

View File

@ -98,7 +98,7 @@ pub enum KindWithContent {
}, },
DocumentEdition { DocumentEdition {
index_uid: String, index_uid: String,
filter_expr: serde_json::Value, filter_expr: Option<serde_json::Value>,
edition_code: String, edition_code: String,
}, },
DocumentDeletion { DocumentDeletion {
@ -214,7 +214,7 @@ impl KindWithContent {
KindWithContent::DocumentEdition { index_uid: _, edition_code, filter_expr } => { KindWithContent::DocumentEdition { index_uid: _, edition_code, filter_expr } => {
Some(Details::DocumentEdition { Some(Details::DocumentEdition {
edited_documents: None, edited_documents: None,
original_filter: filter_expr.to_string(), original_filter: filter_expr.as_ref().map(|v| v.to_string()),
edition_code: edition_code.clone(), edition_code: edition_code.clone(),
}) })
} }
@ -269,7 +269,7 @@ impl KindWithContent {
KindWithContent::DocumentEdition { index_uid: _, filter_expr, edition_code } => { KindWithContent::DocumentEdition { index_uid: _, filter_expr, edition_code } => {
Some(Details::DocumentEdition { Some(Details::DocumentEdition {
edited_documents: Some(0), edited_documents: Some(0),
original_filter: filter_expr.to_string(), original_filter: filter_expr.as_ref().map(|v| v.to_string()),
edition_code: edition_code.clone(), edition_code: edition_code.clone(),
}) })
} }
@ -524,17 +524,48 @@ impl std::error::Error for ParseTaskKindError {}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Details { pub enum Details {
DocumentAdditionOrUpdate { received_documents: u64, indexed_documents: Option<u64> }, DocumentAdditionOrUpdate {
DocumentEdition { edited_documents: Option<u64>, original_filter: String, edition_code: String }, received_documents: u64,
SettingsUpdate { settings: Box<Settings<Unchecked>> }, indexed_documents: Option<u64>,
IndexInfo { primary_key: Option<String> }, },
DocumentDeletion { provided_ids: usize, deleted_documents: Option<u64> }, DocumentEdition {
DocumentDeletionByFilter { original_filter: String, deleted_documents: Option<u64> }, edited_documents: Option<u64>,
ClearAll { deleted_documents: Option<u64> }, original_filter: Option<String>,
TaskCancelation { matched_tasks: u64, canceled_tasks: Option<u64>, original_filter: String }, edition_code: String,
TaskDeletion { matched_tasks: u64, deleted_tasks: Option<u64>, original_filter: String }, },
Dump { dump_uid: Option<String> }, SettingsUpdate {
IndexSwap { swaps: Vec<IndexSwap> }, settings: Box<Settings<Unchecked>>,
},
IndexInfo {
primary_key: Option<String>,
},
DocumentDeletion {
provided_ids: usize,
deleted_documents: Option<u64>,
},
DocumentDeletionByFilter {
original_filter: String,
deleted_documents: Option<u64>,
},
ClearAll {
deleted_documents: Option<u64>,
},
TaskCancelation {
matched_tasks: u64,
canceled_tasks: Option<u64>,
original_filter: String,
},
TaskDeletion {
matched_tasks: u64,
deleted_tasks: Option<u64>,
original_filter: String,
},
Dump {
dump_uid: Option<String>,
},
IndexSwap {
swaps: Vec<IndexSwap>,
},
} }
impl Details { impl Details {

View File

@ -579,7 +579,7 @@ pub async fn delete_documents_by_filter(
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] #[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
pub struct DocumentEditionByFunction { pub struct DocumentEditionByFunction {
#[deserr(error = DeserrJsonError<InvalidDocumentFilter>, missing_field_error = DeserrJsonError::missing_document_filter)] #[deserr(error = DeserrJsonError<InvalidDocumentFilter>, missing_field_error = DeserrJsonError::missing_document_filter)]
filter: Value, filter: Option<Value>,
#[deserr(error = DeserrJsonError<InvalidDocumentFilter>, missing_field_error = DeserrJsonError::missing_document_filter)] #[deserr(error = DeserrJsonError<InvalidDocumentFilter>, missing_field_error = DeserrJsonError::missing_document_filter)]
function: String, function: String,
} }
@ -599,12 +599,14 @@ pub async fn edit_documents_by_function(
// analytics.delete_documents(DocumentDeletionKind::PerFilter, &req); // analytics.delete_documents(DocumentDeletionKind::PerFilter, &req);
// we ensure the filter is well formed before enqueuing it if let Some(ref filter) = filter {
|| -> Result<_, ResponseError> { // we ensure the filter is well formed before enqueuing it
Ok(crate::search::parse_filter(&filter)?.ok_or(MeilisearchHttpError::EmptyFilter)?) || -> Result<_, ResponseError> {
}() Ok(crate::search::parse_filter(filter)?.ok_or(MeilisearchHttpError::EmptyFilter)?)
// and whatever was the error, the error code should always be an InvalidDocumentFilter }()
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?; // and whatever was the error, the error code should always be an InvalidDocumentFilter
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
}
let task = let task =
KindWithContent::DocumentEdition { index_uid, filter_expr: filter, edition_code: function }; KindWithContent::DocumentEdition { index_uid, filter_expr: filter, edition_code: function };

View File

@ -83,6 +83,7 @@ rand = "0.8.5"
tracing = "0.1.40" tracing = "0.1.40"
ureq = { version = "2.10.0", features = ["json"] } ureq = { version = "2.10.0", features = ["json"] }
url = "2.5.2" url = "2.5.2"
piccolo = "0.3.1"
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false } mimalloc = { version = "0.1.43", default-features = false }

View File

@ -40,7 +40,7 @@ use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
}; };
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::{CboRoaringBitmapCodec, Index, Result}; use crate::{fields_ids_map, CboRoaringBitmapCodec, Index, Result};
static MERGED_DATABASE_COUNT: usize = 7; static MERGED_DATABASE_COUNT: usize = 7;
static PREFIX_DATABASE_COUNT: usize = 4; static PREFIX_DATABASE_COUNT: usize = 4;
@ -172,6 +172,62 @@ where
Ok((self, Ok(indexed_documents))) Ok((self, Ok(indexed_documents)))
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
pub fn edit_documents(
mut self,
documents: &RoaringBitmap,
code: &str,
) -> Result<(Self, StdResult<u64, UserError>)> {
// Early return when there is no document to add
if documents.is_empty() {
return Ok((self, Ok(0)));
}
let mut lua = piccolo::Lua::core();
let executor = lua.enter(|ctx| ctx.stash(piccolo::Executor::new(ctx)));
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
for docid in documents {
let document = match self.index.documents.get(self.wtxn, &docid)? {
Some(document) => document,
None => panic!("a document should always exists"),
};
lua.try_enter(|ctx| {
let closure = match piccolo::Closure::load(
ctx,
None,
("return ".to_string() + code).as_bytes(),
) {
Ok(closure) => closure,
Err(_) => piccolo::Closure::load(ctx, None, code.as_bytes())?,
};
let function = piccolo::Function::Closure(closure);
let table = piccolo::Table::new(&ctx);
table.set(ctx, "internal-id", docid)?;
table.set(ctx, "title", "hello")?;
table.set(ctx, "description", "world")?;
dbg!(&table);
ctx.set_global("doc", table)?;
ctx.fetch(&executor).restart(ctx, function, ());
Ok(())
})
.unwrap();
lua.execute::<()>(&executor).unwrap();
lua.try_enter(|ctx| {
let value = ctx.get_global("doc");
dbg!(value);
Ok(())
})
.unwrap();
}
Ok((self, Ok(documents.len())))
}
pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self { pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self {
self.embedders = embedders; self.embedders = embedders;
self self