diff --git a/dump/src/lib.rs b/dump/src/lib.rs index b1b6807a0..a3e892c03 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -101,6 +101,9 @@ pub enum KindDump { documents_ids: Vec, }, DocumentClear, + DocumentDeletionByFilter { + filter: String, + }, Settings { settings: Box>, is_deletion: bool, @@ -166,6 +169,9 @@ impl From for KindDump { KindWithContent::DocumentDeletion { documents_ids, .. } => { KindDump::DocumentDeletion { documents_ids } } + KindWithContent::DocumentDeletionByFilter { filter_expr, .. } => { + KindDump::DocumentDeletionByFilter { filter: filter_expr.to_string() } + } KindWithContent::DocumentClear { .. } => KindDump::DocumentClear, KindWithContent::SettingsUpdate { new_settings, diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 24625a7fb..d738cc5e4 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -25,6 +25,7 @@ enum AutobatchKind { primary_key: Option, }, DocumentDeletion, + DocumentDeletionByFilter, DocumentClear, Settings { allow_index_creation: bool, @@ -64,6 +65,9 @@ impl From for AutobatchKind { } => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key }, KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion, KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear, + KindWithContent::DocumentDeletionByFilter { .. } => { + AutobatchKind::DocumentDeletionByFilter + } KindWithContent::SettingsUpdate { allow_index_creation, is_deletion, .. } => { AutobatchKind::Settings { allow_index_creation: allow_index_creation && !is_deletion, @@ -97,6 +101,9 @@ pub enum BatchKind { DocumentDeletion { deletion_ids: Vec, }, + DocumentDeletionByFilter { + id: TaskId, + }, ClearAndSettings { other: Vec, allow_index_creation: bool, @@ -195,6 +202,9 @@ impl BatchKind { K::DocumentDeletion => { (Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id] }), false) } + K::DocumentDeletionByFilter => { + (Break(BatchKind::DocumentDeletionByFilter { id: task_id }), false) + } K::Settings { allow_index_creation } => ( Continue(BatchKind::Settings { allow_index_creation, settings_ids: vec![task_id] }), allow_index_creation, @@ -212,7 +222,7 @@ impl BatchKind { match (self, kind) { // We don't batch any of these operations - (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap) => Break(this), + (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentDeletionByFilter) => Break(this), // We must not batch tasks that don't have the same index creation rights if the index doesn't already exists. (this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => { Break(this) @@ -471,7 +481,8 @@ impl BatchKind { BatchKind::IndexCreation { .. } | BatchKind::IndexDeletion { .. } | BatchKind::IndexUpdate { .. } - | BatchKind::IndexSwap { .. }, + | BatchKind::IndexSwap { .. } + | BatchKind::DocumentDeletionByFilter { .. }, _, ) => { unreachable!() diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 9c24e8e9d..ff123f1e3 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -28,9 +28,10 @@ use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::update::{ - DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, Settings as MilliSettings, + DeleteDocuments, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, + Settings as MilliSettings, }; -use meilisearch_types::milli::{self, BEU32}; +use meilisearch_types::milli::{self, Filter, BEU32}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; @@ -65,6 +66,10 @@ pub(crate) enum Batch { op: IndexOperation, must_create_index: bool, }, + IndexDocumentDeletionByFilter { + index_uid: String, + task: Task, + }, IndexCreation { index_uid: String, primary_key: Option, @@ -149,6 +154,7 @@ impl Batch { | Batch::TaskDeletion(task) | Batch::Dump(task) | Batch::IndexCreation { task, .. } + | Batch::IndexDocumentDeletionByFilter { task, .. } | Batch::IndexUpdate { task, .. } => vec![task.uid], Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => { tasks.iter().map(|task| task.uid).collect() @@ -187,7 +193,8 @@ impl Batch { IndexOperation { op, .. } => Some(op.index_uid()), IndexCreation { index_uid, .. } | IndexUpdate { index_uid, .. } - | IndexDeletion { index_uid, .. } => Some(index_uid), + | IndexDeletion { index_uid, .. } + | IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid), } } } @@ -227,6 +234,18 @@ impl IndexScheduler { }, must_create_index, })), + BatchKind::DocumentDeletionByFilter { id } => { + let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + match &task.kind { + KindWithContent::DocumentDeletionByFilter { index_uid, .. } => { + Ok(Some(Batch::IndexDocumentDeletionByFilter { + index_uid: index_uid.clone(), + task, + })) + } + _ => unreachable!(), + } + } BatchKind::DocumentOperation { method, operation_ids, .. } => { let tasks = self.get_existing_tasks(rtxn, operation_ids)?; let primary_key = tasks @@ -867,6 +886,64 @@ impl IndexScheduler { Ok(tasks) } + Batch::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => { + let (index_uid, filter) = + if let KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr } = + &task.kind + { + (index_uid, filter_expr) + } else { + unreachable!() + }; + let index = { + let rtxn = self.env.read_txn()?; + self.index_mapper.index(&rtxn, index_uid)? + }; + let filter = Filter::from_json(filter)?; + let deleted_documents = if let Some(filter) = filter { + let index_rtxn = index.read_txn()?; + + let candidates = filter.evaluate(&index_rtxn, &index)?; + let mut wtxn = index.write_txn()?; + let mut delete_operation = DeleteDocuments::new(&mut wtxn, &index)?; + delete_operation.delete_documents(&candidates); + let result = delete_operation.execute().map(|result| result.deleted_documents); + wtxn.commit()?; + result + } else { + Ok(0) + }; + let original_filter = if let Some(Details::DocumentDeletionByFilter { + original_filter, + deleted_documents: _, + }) = task.details + { + original_filter + } else { + // In the case of a `documentDeleteByFilter` the details MUST be set + unreachable!(); + }; + + match deleted_documents { + Ok(deleted_documents) => { + task.status = Status::Succeeded; + task.details = Some(Details::DocumentDeletionByFilter { + original_filter, + deleted_documents: Some(deleted_documents), + }); + } + Err(e) => { + task.status = Status::Failed; + task.details = Some(Details::DocumentDeletionByFilter { + original_filter, + deleted_documents: Some(0), + }); + task.error = Some(e.into()); + } + } + + Ok(vec![task]) + } Batch::IndexCreation { index_uid, primary_key, task } => { let wtxn = self.env.write_txn()?; if self.index_mapper.exists(&wtxn, &index_uid)? { diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index 43509aa84..3dae2800b 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -183,6 +183,9 @@ fn snapshot_details(d: &Details) -> String { provided_ids: received_document_ids, deleted_documents, } => format!("{{ received_document_ids: {received_document_ids}, deleted_documents: {deleted_documents:?} }}"), + Details::DocumentDeletionByFilter { original_filter, deleted_documents } => format!( + "{{ original_filter: {original_filter}, deleted_documents: {deleted_documents:?} }}" + ), Details::ClearAll { deleted_documents } => { format!("{{ deleted_documents: {deleted_documents:?} }}") }, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index d713fca17..4aef427e5 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1208,6 +1208,13 @@ impl<'a> Dump<'a> { documents_ids, index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, }, + KindDump::DocumentDeletionByFilter { filter } => { + KindWithContent::DocumentDeletionByFilter { + filter_expr: serde_json::from_str(&filter) + .map_err(|_| Error::CorruptedDump)?, + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + } + } KindDump::DocumentClear => KindWithContent::DocumentClear { index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, }, diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 7718e1af0..97f437bed 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -239,6 +239,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) { match &mut task.kind { K::DocumentAdditionOrUpdate { index_uid, .. } => index_uids.push(index_uid), K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid), + K::DocumentDeletionByFilter { index_uid, .. } => index_uids.push(index_uid), K::DocumentClear { index_uid } => index_uids.push(index_uid), K::SettingsUpdate { index_uid, .. } => index_uids.push(index_uid), K::IndexDeletion { index_uid } => index_uids.push(index_uid), @@ -464,6 +465,29 @@ impl IndexScheduler { } } } + Details::DocumentDeletionByFilter { deleted_documents, original_filter: _ } => { + assert_eq!(kind.as_kind(), Kind::DocumentDeletionByFilter); + let (index_uid, _) = if let KindWithContent::DocumentDeletionByFilter { + ref index_uid, + ref filter_expr, + } = kind + { + (index_uid, filter_expr) + } else { + unreachable!() + }; + assert_eq!(&task_index_uid.unwrap(), index_uid); + + match status { + Status::Enqueued | Status::Processing => (), + Status::Succeeded => { + assert!(deleted_documents.is_some()); + } + Status::Failed | Status::Canceled => { + assert!(deleted_documents == Some(0)); + } + } + } Details::ClearAll { deleted_documents } => { assert!(matches!( kind.as_kind(), diff --git a/meilisearch-types/src/error.rs b/meilisearch-types/src/error.rs index a201db7ac..c36d27529 100644 --- a/meilisearch-types/src/error.rs +++ b/meilisearch-types/src/error.rs @@ -315,6 +315,7 @@ impl ErrorCode for milli::Error { UserError::MaxDatabaseSizeReached => Code::DatabaseSizeLimitReached, UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded, UserError::InvalidFilter(_) => Code::InvalidSearchFilter, + UserError::InvalidFilterExpression(..) => Code::InvalidSearchFilter, UserError::MissingDocumentId { .. } => Code::MissingDocumentId, UserError::InvalidDocumentId { .. } | UserError::TooManyDocumentIds { .. } => { Code::InvalidDocumentId diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index 3b7efb97b..88263d150 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -49,6 +49,7 @@ impl Task { | IndexSwap { .. } => None, DocumentAdditionOrUpdate { index_uid, .. } | DocumentDeletion { index_uid, .. } + | DocumentDeletionByFilter { index_uid, .. } | DocumentClear { index_uid } | SettingsUpdate { index_uid, .. } | IndexCreation { index_uid, .. } @@ -67,6 +68,7 @@ impl Task { match self.kind { KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => Some(content_file), KindWithContent::DocumentDeletion { .. } + | KindWithContent::DocumentDeletionByFilter { .. } | KindWithContent::DocumentClear { .. } | KindWithContent::SettingsUpdate { .. } | KindWithContent::IndexDeletion { .. } @@ -81,6 +83,11 @@ impl Task { } } +pub enum DocumentDeletionContent { + ByDocumentIds(Vec), + ByFilter(serde_json::Value), +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum KindWithContent { @@ -96,6 +103,10 @@ pub enum KindWithContent { index_uid: String, documents_ids: Vec, }, + DocumentDeletionByFilter { + index_uid: String, + filter_expr: serde_json::Value, + }, DocumentClear { index_uid: String, }, @@ -145,6 +156,7 @@ impl KindWithContent { match self { KindWithContent::DocumentAdditionOrUpdate { .. } => Kind::DocumentAdditionOrUpdate, KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion, + KindWithContent::DocumentDeletionByFilter { .. } => Kind::DocumentDeletion, KindWithContent::DocumentClear { .. } => Kind::DocumentDeletion, KindWithContent::SettingsUpdate { .. } => Kind::SettingsUpdate, KindWithContent::IndexCreation { .. } => Kind::IndexCreation, @@ -168,6 +180,7 @@ impl KindWithContent { | TaskDeletion { .. } => vec![], DocumentAdditionOrUpdate { index_uid, .. } | DocumentDeletion { index_uid, .. } + | DocumentDeletionByFilter { index_uid, .. } | DocumentClear { index_uid } | SettingsUpdate { index_uid, .. } | IndexCreation { index_uid, .. } @@ -200,6 +213,12 @@ impl KindWithContent { deleted_documents: None, }) } + KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => { + Some(Details::DocumentDeletionByFilter { + original_filter: filter_expr.to_string(), + deleted_documents: None, + }) + } KindWithContent::DocumentClear { .. } | KindWithContent::IndexDeletion { .. } => { Some(Details::ClearAll { deleted_documents: None }) } @@ -242,6 +261,12 @@ impl KindWithContent { deleted_documents: Some(0), }) } + KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => { + Some(Details::DocumentDeletionByFilter { + original_filter: filter_expr.to_string(), + deleted_documents: Some(0), + }) + } KindWithContent::DocumentClear { .. } => { Some(Details::ClearAll { deleted_documents: None }) } @@ -282,6 +307,7 @@ impl From<&KindWithContent> for Option
{ }) } KindWithContent::DocumentDeletion { .. } => None, + KindWithContent::DocumentDeletionByFilter { .. } => None, KindWithContent::DocumentClear { .. } => None, KindWithContent::SettingsUpdate { new_settings, .. } => { Some(Details::SettingsUpdate { settings: new_settings.clone() }) @@ -374,6 +400,7 @@ impl std::error::Error for ParseTaskStatusError {} pub enum Kind { DocumentAdditionOrUpdate, DocumentDeletion, + DocumentDeletionByFilter, SettingsUpdate, IndexCreation, IndexDeletion, @@ -390,6 +417,7 @@ impl Kind { match self { Kind::DocumentAdditionOrUpdate | Kind::DocumentDeletion + | Kind::DocumentDeletionByFilter | Kind::SettingsUpdate | Kind::IndexCreation | Kind::IndexDeletion @@ -407,6 +435,7 @@ impl Display for Kind { match self { Kind::DocumentAdditionOrUpdate => write!(f, "documentAdditionOrUpdate"), Kind::DocumentDeletion => write!(f, "documentDeletion"), + Kind::DocumentDeletionByFilter => write!(f, "documentDeletionByFilter"), Kind::SettingsUpdate => write!(f, "settingsUpdate"), Kind::IndexCreation => write!(f, "indexCreation"), Kind::IndexDeletion => write!(f, "indexDeletion"), @@ -478,6 +507,7 @@ pub enum Details { SettingsUpdate { settings: Box> }, IndexInfo { primary_key: Option }, DocumentDeletion { provided_ids: usize, deleted_documents: Option }, + DocumentDeletionByFilter { original_filter: String, deleted_documents: Option }, ClearAll { deleted_documents: Option }, TaskCancelation { matched_tasks: u64, canceled_tasks: Option, original_filter: String }, TaskDeletion { matched_tasks: u64, deleted_tasks: Option, original_filter: String }, @@ -493,6 +523,9 @@ impl Details { *indexed_documents = Some(0) } Self::DocumentDeletion { deleted_documents, .. } => *deleted_documents = Some(0), + Self::DocumentDeletionByFilter { deleted_documents, .. } => { + *deleted_documents = Some(0) + } Self::ClearAll { deleted_documents } => *deleted_documents = Some(0), Self::TaskCancelation { canceled_tasks, .. } => *canceled_tasks = Some(0), Self::TaskDeletion { deleted_tasks, .. } => *deleted_tasks = Some(0), diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 5c1e385fd..8c8229429 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -17,7 +17,6 @@ use meilisearch_types::error::{Code, ResponseError}; use meilisearch_types::heed::RoTxn; use meilisearch_types::index_uid::IndexUid; use meilisearch_types::milli::update::IndexDocumentsMethod; -use meilisearch_types::milli::InternalError; use meilisearch_types::star_or::OptionStarOrList; use meilisearch_types::tasks::KindWithContent; use meilisearch_types::{milli, Document, Index}; @@ -381,31 +380,6 @@ pub enum DocumentDeletionQuery { Object { filter: Option }, } -/// Parses a Json encoded document id and validate it, returning a user error when it is one. -/// FIXME: stolen from milli -fn validate_document_id_value(document_id: Value) -> String { - match document_id { - Value::String(string) => match validate_document_id(&string) { - Some(s) if s.len() == string.len() => string, - Some(s) => s.to_string(), - None => panic!(), - }, - Value::Number(number) if number.is_i64() => number.to_string(), - _content => panic!(), - } -} - -/// FIXME: stolen from milli -fn validate_document_id(document_id: &str) -> Option<&str> { - if !document_id.is_empty() - && document_id.chars().all(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_')) - { - Some(document_id) - } else { - None - } -} - pub async fn delete_documents( index_scheduler: GuardedData, Data>, index_uid: web::Path, @@ -428,34 +402,22 @@ pub async fn delete_documents( debug!("filter: {:?}", filter); // FIXME: spawn_blocking - if let Some(ref filter) = filter { - if let Some(facets) = crate::search::parse_filter(filter)? { + if let Some(mut filter) = filter { + if let Some(facets) = crate::search::parse_filter(&filter)? { debug!("facets: {:?}", facets); - let index = index_scheduler.index(&index_uid)?; - let rtxn = index.read_txn()?; - let filtered_candidates = facets.evaluate(&rtxn, &index)?; - debug!("filtered_candidates.len(): {:?}", filtered_candidates.len()); + let task = KindWithContent::DocumentDeletionByFilter { + index_uid: index_uid.to_string(), + filter_expr: filter.take(), + }; - // FIXME: unwraps - let primary_key = index.primary_key(&rtxn)?.unwrap(); - let primary_key = index.fields_ids_map(&rtxn)?.id(primary_key).unwrap(); + let task: SummarizedTaskView = + tokio::task::spawn_blocking(move || index_scheduler.register(task)) + .await?? + .into(); - let documents = index.documents(&rtxn, filtered_candidates.into_iter())?; - debug!("documents.len(): {:?}", documents.len()); - let documents: Vec = documents - .into_iter() - .map(|(_, document)| { - let value = document.get(primary_key).unwrap(); - let value: Value = serde_json::from_slice(value) - .map_err(InternalError::SerdeJson) - .unwrap(); - - validate_document_id_value(value) - }) - .collect(); - debug!("documents: {:?}", documents); - documents + debug!("returns: {:?}", task); + return Ok(HttpResponse::Accepted().json(task)); } else { vec![] } diff --git a/meilisearch/src/routes/tasks.rs b/meilisearch/src/routes/tasks.rs index 1356ff722..122225d0a 100644 --- a/meilisearch/src/routes/tasks.rs +++ b/meilisearch/src/routes/tasks.rs @@ -133,6 +133,13 @@ impl From
for DetailsView { deleted_documents: Some(deleted_documents), ..DetailsView::default() }, + Details::DocumentDeletionByFilter { original_filter, deleted_documents } => { + DetailsView { + original_filter: Some(original_filter), + deleted_documents: Some(deleted_documents), + ..DetailsView::default() + } + } Details::ClearAll { deleted_documents } => { DetailsView { deleted_documents: Some(deleted_documents), ..DetailsView::default() } } diff --git a/milli/src/error.rs b/milli/src/error.rs index 9b1ec0a87..7f0faf2fd 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -112,6 +112,8 @@ only composed of alphanumeric characters (a-z A-Z 0-9), hyphens (-) and undersco InvalidGeoField(#[from] GeoError), #[error("{0}")] InvalidFilter(String), + #[error("Invalid type for filter subexpression: `expected {}, found: {1}`.", .0.join(", "))] + InvalidFilterExpression(&'static [&'static str], Value), #[error("Attribute `{}` is not sortable. {}", .field, match .valid_fields.is_empty() { diff --git a/milli/src/search/facet/filter.rs b/milli/src/search/facet/filter.rs index 19d763107..fac7b68ea 100644 --- a/milli/src/search/facet/filter.rs +++ b/milli/src/search/facet/filter.rs @@ -5,6 +5,7 @@ use std::ops::Bound::{self, Excluded, Included}; use either::Either; pub use filter_parser::{Condition, Error as FPError, FilterCondition, Span, Token}; use roaring::RoaringBitmap; +use serde_json::Value; use super::facet_range_search; use crate::error::{Error, UserError}; @@ -112,6 +113,52 @@ impl<'a> From> for FilterCondition<'a> { } impl<'a> Filter<'a> { + pub fn from_json(facets: &'a Value) -> Result> { + match facets { + Value::String(expr) => { + let condition = Filter::from_str(expr)?; + Ok(condition) + } + Value::Array(arr) => Self::parse_filter_array(arr), + v => Err(Error::UserError(UserError::InvalidFilterExpression( + &["String", "Array"], + v.clone(), + ))), + } + } + + fn parse_filter_array(arr: &'a [Value]) -> Result> { + let mut ands = Vec::new(); + for value in arr { + match value { + Value::String(s) => ands.push(Either::Right(s.as_str())), + Value::Array(arr) => { + let mut ors = Vec::new(); + for value in arr { + match value { + Value::String(s) => ors.push(s.as_str()), + v => { + return Err(Error::UserError(UserError::InvalidFilterExpression( + &["String"], + v.clone(), + ))) + } + } + } + ands.push(Either::Left(ors)); + } + v => { + return Err(Error::UserError(UserError::InvalidFilterExpression( + &["String", "[String]"], + v.clone(), + ))) + } + } + } + + Filter::from_array(ands) + } + pub fn from_array(array: I) -> Result> where I: IntoIterator>,