From 441641397b28cc2baae18b117d92feab512283f0 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 8 Mar 2023 09:44:43 +0100 Subject: [PATCH] Implement document get with filters --- meilisearch/src/routes/indexes/documents.rs | 117 +++++++++++++++----- 1 file changed, 92 insertions(+), 25 deletions(-) diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index a6ee8d16e..8aa036239 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -17,6 +17,7 @@ use meilisearch_types::error::{Code, ResponseError}; use meilisearch_types::heed::RoTxn; use meilisearch_types::index_uid::IndexUid; use meilisearch_types::milli::update::IndexDocumentsMethod; +use meilisearch_types::milli::DocumentId; use meilisearch_types::star_or::OptionStarOrList; use meilisearch_types::tasks::KindWithContent; use meilisearch_types::{milli, Document, Index}; @@ -36,6 +37,7 @@ use crate::extractors::authentication::GuardedData; use crate::extractors::payload::Payload; use crate::extractors::sequential_extractor::SeqHandler; use crate::routes::{PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; +use crate::search::parse_filter; static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] @@ -66,7 +68,7 @@ pub struct DocumentParam { pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( web::resource("") - .route(web::get().to(SeqHandler(get_all_documents))) + .route(web::get().to(SeqHandler(get_documents))) .route(web::post().to(SeqHandler(replace_documents))) .route(web::put().to(SeqHandler(update_documents))) .route(web::delete().to(SeqHandler(clear_all_documents))), @@ -76,6 +78,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { web::resource("/delete-batch").route(web::post().to(SeqHandler(delete_documents_batch))), ) .service(web::resource("/delete").route(web::post().to(SeqHandler(delete_documents_by_filter)))) + .service(web::resource("/fetch").route(web::post().to(SeqHandler(documents_by_query_post)))) .service( web::resource("/{document_id}") .route(web::get().to(SeqHandler(get_document))) @@ -130,29 +133,76 @@ pub async fn delete_document( #[derive(Debug, Deserr)] #[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)] -pub struct BrowseQuery { +pub struct BrowseQueryGet { #[deserr(default, error = DeserrQueryParamError)] offset: Param, #[deserr(default = Param(PAGINATION_DEFAULT_LIMIT), error = DeserrQueryParamError)] limit: Param, #[deserr(default, error = DeserrQueryParamError)] fields: OptionStarOrList, + #[deserr(default, error = DeserrQueryParamError)] + filter: Option, } -pub async fn get_all_documents( +#[derive(Debug, Deserr)] +#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] +pub struct BrowseQuery { + #[deserr(default, error = DeserrJsonError)] + offset: usize, + #[deserr(default=PAGINATION_DEFAULT_LIMIT, error = DeserrJsonError)] + limit: usize, + #[deserr(default, error = DeserrJsonError)] + fields: OptionStarOrList, + #[deserr(default, error = DeserrJsonError)] + filter: Option, +} + +pub async fn documents_by_query_post( index_scheduler: GuardedData, Data>, index_uid: web::Path, - params: AwebQueryParameter, + body: AwebJson, +) -> Result { + debug!("called with body: {:?}", body); + + documents_by_query(&index_scheduler, index_uid, body.into_inner()) +} + +pub async fn get_documents( + index_scheduler: GuardedData, Data>, + index_uid: web::Path, + params: AwebQueryParameter, +) -> Result { + debug!("called with params: {:?}", params); + + let BrowseQueryGet { limit, offset, fields, filter } = params.into_inner(); + + let filter = match filter { + Some(f) => match serde_json::from_str(&f) { + Ok(v) => Some(v), + _ => Some(Value::String(f)), + }, + None => None, + }; + + let query = BrowseQuery { offset: offset.0, limit: limit.0, fields, filter }; + + documents_by_query(&index_scheduler, index_uid, query) +} + +fn documents_by_query( + index_scheduler: &IndexScheduler, + index_uid: web::Path, + query: BrowseQuery, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; - debug!("called with params: {:?}", params); - let BrowseQuery { limit, offset, fields } = params.into_inner(); + let BrowseQuery { offset, limit, fields, filter } = query; let attributes_to_retrieve = fields.merge_star_and_none(); let index = index_scheduler.index(&index_uid)?; - let (total, documents) = retrieve_documents(&index, offset.0, limit.0, attributes_to_retrieve)?; + let (total, documents) = + retrieve_documents(&index, offset, limit, filter, attributes_to_retrieve)?; - let ret = PaginationView::new(offset.0, limit.0, total as usize, documents); + let ret = PaginationView::new(offset, limit, total as usize, documents); debug!("returns: {:?}", ret); Ok(HttpResponse::Ok().json(ret)) @@ -455,14 +505,15 @@ pub async fn clear_all_documents( Ok(HttpResponse::Accepted().json(task)) } -fn all_documents<'a>( - index: &Index, - rtxn: &'a RoTxn, +fn some_documents<'a, 't: 'a>( + index: &'a Index, + rtxn: &'t RoTxn, + doc_ids: impl IntoIterator + 'a, ) -> Result> + 'a, ResponseError> { let fields_ids_map = index.fields_ids_map(rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - Ok(index.all_documents(rtxn)?.map(move |ret| { + Ok(index.iter_documents(rtxn, doc_ids)?.map(move |ret| { ret.map_err(ResponseError::from).and_then(|(_key, document)| -> Result<_, ResponseError> { Ok(milli::obkv_to_json(&all_fields, &fields_ids_map, document)?) }) @@ -473,24 +524,40 @@ fn retrieve_documents>( index: &Index, offset: usize, limit: usize, + filter: Option, attributes_to_retrieve: Option>, ) -> Result<(u64, Vec), ResponseError> { let rtxn = index.read_txn()?; + let filter = &filter; + let filter = if let Some(filter) = filter { parse_filter(filter)? } else { None }; - let mut documents = Vec::new(); - for document in all_documents(index, &rtxn)?.skip(offset).take(limit) { - let document = match &attributes_to_retrieve { - Some(attributes_to_retrieve) => permissive_json_pointer::select_values( - &document?, - attributes_to_retrieve.iter().map(|s| s.as_ref()), - ), - None => document?, - }; - documents.push(document); - } + let candidates = if let Some(filter) = filter { + filter.evaluate(&rtxn, index)? + } else { + index.documents_ids(&rtxn)? + }; - let number_of_documents = index.number_of_documents(&rtxn)?; - Ok((number_of_documents, documents)) + let (it, number_of_documents) = { + let number_of_documents = candidates.len(); + ( + some_documents(index, &rtxn, candidates.into_iter().skip(offset).take(limit))?, + number_of_documents, + ) + }; + + let documents: Result, ResponseError> = it + .map(|document| { + Ok(match &attributes_to_retrieve { + Some(attributes_to_retrieve) => permissive_json_pointer::select_values( + &document?, + attributes_to_retrieve.iter().map(|s| s.as_ref()), + ), + None => document?, + }) + }) + .collect(); + + Ok((number_of_documents, documents?)) } fn retrieve_document>(