diff --git a/Cargo.lock b/Cargo.lock index 937fce64a..b22b12fdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1987,6 +1987,30 @@ dependencies = [ "serde_json", ] +[[package]] +name = "gc-arena" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d4a9d8c3c1ef4301b8afc383e53e102a13f9947da2181bf82828480dcc5165" +dependencies = [ + "allocator-api2", + "gc-arena-derive", + "hashbrown", + "sptr", +] + +[[package]] +name = "gc-arena-derive" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c952d28a64896b1c4ac382dcd7beeaeaabc13e8c7c7f800ea2938abd828ed30" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", + "synstructure", +] + [[package]] name = "gemm" version = "0.17.1" @@ -3511,6 +3535,7 @@ dependencies = [ "obkv", "once_cell", "ordered-float", + "piccolo", "puffin", "rand", "rand_pcg", @@ -4014,6 +4039,21 @@ dependencies = [ "siphasher 0.3.11", ] +[[package]] +name = "piccolo" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93dd1815b42446904bb2689d1c5d7680e8c68113d5b15a5a3297ba6c7a5f84af" +dependencies = [ + "ahash", + "allocator-api2", + "anyhow", + "gc-arena", + "hashbrown", + "rand", + "thiserror", +] + [[package]] name = "pin-project" version = "1.1.4" @@ -4899,6 +4939,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "sptr" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b9b39299b249ad65f3b7e96443bad61c02ca5cd3589f46cb6d610a0fd6c0d6a" + [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index e8aa25220..55bca2725 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1699,42 +1699,40 @@ fn delete_document_by_filter<'a>( fn edit_documents_by_function<'a>( wtxn: &mut RwTxn<'a>, - filter: &serde_json::Value, + filter: &Option, code: &str, indexer_config: &IndexerConfig, must_stop_processing: MustStopProcessing, index: &'a Index, ) -> Result { - let filter = Filter::from_json(filter)?; - Ok(if let Some(filter) = filter { - let candidates = filter.evaluate(wtxn, index).map_err(|err| match err { + let candidates = match filter.as_ref().map(Filter::from_json) { + Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err { milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) } e => e.into(), - })?; + })?, + None | Some(Ok(None)) => index.documents_ids(wtxn)?, + Some(Err(e)) => return Err(e.into()), + }; - let config = IndexDocumentsConfig { - update_method: IndexDocumentsMethod::ReplaceDocuments, - ..Default::default() - }; + let config = IndexDocumentsConfig { + update_method: IndexDocumentsMethod::ReplaceDocuments, + ..Default::default() + }; - let mut builder = milli::update::IndexDocuments::new( - wtxn, - index, - indexer_config, - config, - |indexing_step| tracing::debug!(update = ?indexing_step), - || must_stop_processing.get(), - )?; + let mut builder = milli::update::IndexDocuments::new( + wtxn, + index, + indexer_config, + config, + |indexing_step| tracing::debug!(update = ?indexing_step), + || must_stop_processing.get(), + )?; - todo!("edit documents with the code and reinsert them in the builder") - // let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?; - // builder = new_builder; + let (new_builder, count) = builder.edit_documents(&candidates, code)?; + builder = new_builder; - // let _ = builder.execute()?; - // count - } else { - 0 - }) + let _ = builder.execute()?; + Ok(count.unwrap()) } diff --git a/meilisearch-types/src/task_view.rs b/meilisearch-types/src/task_view.rs index d718ee33a..b8e55e8a8 100644 --- a/meilisearch-types/src/task_view.rs +++ b/meilisearch-types/src/task_view.rs @@ -93,7 +93,7 @@ impl From
for DetailsView { Details::DocumentEdition { edited_documents, original_filter, edition_code } => { DetailsView { edited_documents: Some(edited_documents), - original_filter: Some(Some(original_filter)), + original_filter: Some(original_filter), edition_code: Some(edition_code), ..DetailsView::default() } diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index e6bb57cf7..0501ed5ff 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -98,7 +98,7 @@ pub enum KindWithContent { }, DocumentEdition { index_uid: String, - filter_expr: serde_json::Value, + filter_expr: Option, edition_code: String, }, DocumentDeletion { @@ -214,7 +214,7 @@ impl KindWithContent { KindWithContent::DocumentEdition { index_uid: _, edition_code, filter_expr } => { Some(Details::DocumentEdition { edited_documents: None, - original_filter: filter_expr.to_string(), + original_filter: filter_expr.as_ref().map(|v| v.to_string()), edition_code: edition_code.clone(), }) } @@ -269,7 +269,7 @@ impl KindWithContent { KindWithContent::DocumentEdition { index_uid: _, filter_expr, edition_code } => { Some(Details::DocumentEdition { edited_documents: Some(0), - original_filter: filter_expr.to_string(), + original_filter: filter_expr.as_ref().map(|v| v.to_string()), edition_code: edition_code.clone(), }) } @@ -524,17 +524,48 @@ impl std::error::Error for ParseTaskKindError {} #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub enum Details { - DocumentAdditionOrUpdate { received_documents: u64, indexed_documents: Option }, - DocumentEdition { edited_documents: Option, original_filter: String, edition_code: String }, - SettingsUpdate { settings: Box> }, - IndexInfo { primary_key: Option }, - DocumentDeletion { provided_ids: usize, deleted_documents: Option }, - DocumentDeletionByFilter { original_filter: String, deleted_documents: Option }, - ClearAll { deleted_documents: Option }, - TaskCancelation { matched_tasks: u64, canceled_tasks: Option, original_filter: String }, - TaskDeletion { matched_tasks: u64, deleted_tasks: Option, original_filter: String }, - Dump { dump_uid: Option }, - IndexSwap { swaps: Vec }, + DocumentAdditionOrUpdate { + received_documents: u64, + indexed_documents: Option, + }, + DocumentEdition { + edited_documents: Option, + original_filter: Option, + edition_code: String, + }, + SettingsUpdate { + settings: Box>, + }, + IndexInfo { + primary_key: Option, + }, + DocumentDeletion { + provided_ids: usize, + deleted_documents: Option, + }, + DocumentDeletionByFilter { + original_filter: String, + deleted_documents: Option, + }, + ClearAll { + deleted_documents: Option, + }, + TaskCancelation { + matched_tasks: u64, + canceled_tasks: Option, + original_filter: String, + }, + TaskDeletion { + matched_tasks: u64, + deleted_tasks: Option, + original_filter: String, + }, + Dump { + dump_uid: Option, + }, + IndexSwap { + swaps: Vec, + }, } impl Details { diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 8914a39cf..a98af35ba 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -558,7 +558,7 @@ pub async fn delete_documents_by_filter( #[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] pub struct DocumentEditionByFunction { #[deserr(error = DeserrJsonError, missing_field_error = DeserrJsonError::missing_document_filter)] - filter: Value, + filter: Option, #[deserr(error = DeserrJsonError, missing_field_error = DeserrJsonError::missing_document_filter)] function: String, } @@ -578,12 +578,14 @@ pub async fn edit_documents_by_function( // analytics.delete_documents(DocumentDeletionKind::PerFilter, &req); - // we ensure the filter is well formed before enqueuing it - || -> Result<_, ResponseError> { - Ok(crate::search::parse_filter(&filter)?.ok_or(MeilisearchHttpError::EmptyFilter)?) - }() - // and whatever was the error, the error code should always be an InvalidDocumentFilter - .map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?; + if let Some(ref filter) = filter { + // we ensure the filter is well formed before enqueuing it + || -> Result<_, ResponseError> { + Ok(crate::search::parse_filter(filter)?.ok_or(MeilisearchHttpError::EmptyFilter)?) + }() + // and whatever was the error, the error code should always be an InvalidDocumentFilter + .map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?; + } let task = KindWithContent::DocumentEdition { index_uid, filter_expr: filter, edition_code: function }; diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 7d903178b..017d8c93d 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -87,6 +87,7 @@ rand = "0.8.5" tracing = "0.1.40" ureq = { version = "2.9.7", features = ["json"] } url = "2.5.0" +piccolo = "0.3.1" [dev-dependencies] mimalloc = { version = "0.1.39", default-features = false } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index bb180a7ee..3072d55b5 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -39,7 +39,7 @@ use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, }; use crate::vector::EmbeddingConfigs; -use crate::{CboRoaringBitmapCodec, Index, Result}; +use crate::{fields_ids_map, CboRoaringBitmapCodec, Index, Result}; static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 4; @@ -173,6 +173,62 @@ where Ok((self, Ok(indexed_documents))) } + #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] + pub fn edit_documents( + mut self, + documents: &RoaringBitmap, + code: &str, + ) -> Result<(Self, StdResult)> { + // Early return when there is no document to add + if documents.is_empty() { + return Ok((self, Ok(0))); + } + + let mut lua = piccolo::Lua::core(); + let executor = lua.enter(|ctx| ctx.stash(piccolo::Executor::new(ctx))); + let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; + + for docid in documents { + let document = match self.index.documents.get(self.wtxn, &docid)? { + Some(document) => document, + None => panic!("a document should always exists"), + }; + + lua.try_enter(|ctx| { + let closure = match piccolo::Closure::load( + ctx, + None, + ("return ".to_string() + code).as_bytes(), + ) { + Ok(closure) => closure, + Err(_) => piccolo::Closure::load(ctx, None, code.as_bytes())?, + }; + let function = piccolo::Function::Closure(closure); + + let table = piccolo::Table::new(&ctx); + table.set(ctx, "internal-id", docid)?; + table.set(ctx, "title", "hello")?; + table.set(ctx, "description", "world")?; + dbg!(&table); + ctx.set_global("doc", table)?; + + ctx.fetch(&executor).restart(ctx, function, ()); + Ok(()) + }) + .unwrap(); + + lua.execute::<()>(&executor).unwrap(); + lua.try_enter(|ctx| { + let value = ctx.get_global("doc"); + dbg!(value); + Ok(()) + }) + .unwrap(); + } + + Ok((self, Ok(documents.len()))) + } + pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self { self.embedders = embedders; self