From 8fd9dc231c05cf1ab277f0d3a3c0b36163a074e5 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 10 Feb 2021 17:08:37 +0100 Subject: [PATCH 1/3] implement retrieve all documents --- src/data/search.rs | 43 ++++++++++++++++++++++++++++++++++++++++-- src/routes/document.rs | 34 ++++++++++++++++++++++++++------- src/routes/index.rs | 1 - 3 files changed, 68 insertions(+), 10 deletions(-) diff --git a/src/data/search.rs b/src/data/search.rs index d0858d704..329efc3ea 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -1,8 +1,9 @@ use std::collections::HashSet; use std::mem; use std::time::Instant; +use std::ops::RangeBounds; -use anyhow::bail; +use anyhow::{bail, Context}; use meilisearch_tokenizer::{Analyzer, AnalyzerConfig}; use milli::{Index, obkv_to_json, FacetCondition}; use serde::{Deserialize, Serialize}; @@ -70,7 +71,7 @@ impl SearchQuery { let highlighter = Highlighter::new(&stop_words); for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() { - let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap(); + let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv)?; if let Some(ref attributes_to_highlight) = self.attributes_to_highlight { highlighter.highlight_record(&mut object, &found_words, attributes_to_highlight); } @@ -165,4 +166,42 @@ impl Data { None => bail!("index {:?} doesn't exists", index.as_ref()), } } + + pub fn retrieve_documents( + &self, + index: impl AsRef, + offset: usize, + count: usize, + attributes_to_retrieve: Option<&[&str]>, + ) -> anyhow::Result>> { + let index = self.index_controller + .index(&index)? + .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; + let txn = index.read_txn()?; + + let mut documents = Vec::new(); + + let fields_ids_map = index.fields_ids_map(&txn)?; + + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .as_ref() + .iter() + .filter_map(|f| fields_ids_map.id(f)) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + let iter = index.documents.range(&txn, &(..))? + .skip(offset) + .take(count); + + for entry in iter { + let (_id, obkv) = entry?; + let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?; + documents.push(object); + } + + Ok(documents) + } } diff --git a/src/routes/document.rs b/src/routes/document.rs index dcc669f85..1114ddcb3 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -12,6 +12,9 @@ use crate::error::ResponseError; use crate::helpers::Authentication; use crate::routes::IndexParam; +const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0; +const DEFAULT_RETRIEVE_DOCUMENTS_LIMIT: usize = 20; + macro_rules! guard_content_type { ($fn_name:ident, $guard_value:literal) => { fn $fn_name(head: &actix_web::dev::RequestHead) -> bool { @@ -69,18 +72,35 @@ async fn delete_document( #[derive(Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] struct BrowseQuery { - _offset: Option, - _limit: Option, - _attributes_to_retrieve: Option, + offset: Option, + limit: Option, + attributes_to_retrieve: Option, } #[get("/indexes/{index_uid}/documents", wrap = "Authentication::Public")] async fn get_all_documents( - _data: web::Data, - _path: web::Path, - _params: web::Query, + data: web::Data, + path: web::Path, + params: web::Query, ) -> Result { - todo!() + let attributes_to_retrieve = params + .attributes_to_retrieve + .as_ref() + .map(|attrs| attrs + .split(",") + .collect::>()); + + match data.retrieve_documents( + &path.index_uid, + params.offset.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_OFFSET), + params.limit.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_LIMIT), + attributes_to_retrieve.as_deref()) { + Ok(docs) => { + let json = serde_json::to_string(&docs).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(_) => { todo!() } + } } #[derive(Deserialize)] diff --git a/src/routes/index.rs b/src/routes/index.rs index d682376e3..a77f26e1f 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -32,7 +32,6 @@ async fn list_indexes(data: web::Data) -> Result Date: Thu, 11 Feb 2021 10:59:23 +0100 Subject: [PATCH 2/3] implement get document --- src/data/search.rs | 48 +++++++++++++++++-- .../local_index_controller/update_store.rs | 2 + src/routes/document.rs | 21 ++++++-- 3 files changed, 61 insertions(+), 10 deletions(-) diff --git a/src/data/search.rs b/src/data/search.rs index 329efc3ea..33e48e4fd 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -1,7 +1,6 @@ use std::collections::HashSet; use std::mem; use std::time::Instant; -use std::ops::RangeBounds; use anyhow::{bail, Context}; use meilisearch_tokenizer::{Analyzer, AnalyzerConfig}; @@ -171,15 +170,14 @@ impl Data { &self, index: impl AsRef, offset: usize, - count: usize, + limit: usize, attributes_to_retrieve: Option<&[&str]>, ) -> anyhow::Result>> { let index = self.index_controller .index(&index)? .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; - let txn = index.read_txn()?; - let mut documents = Vec::new(); + let txn = index.read_txn()?; let fields_ids_map = index.fields_ids_map(&txn)?; @@ -194,7 +192,9 @@ impl Data { let iter = index.documents.range(&txn, &(..))? .skip(offset) - .take(count); + .take(limit); + + let mut documents = Vec::new(); for entry in iter { let (_id, obkv) = entry?; @@ -204,4 +204,42 @@ impl Data { Ok(documents) } + + pub fn retrieve_document( + &self, + index: impl AsRef, + document_id: impl AsRef, + attributes_to_retrieve: Option<&[&str]>, + ) -> anyhow::Result> { + let index = self.index_controller + .index(&index)? + .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; + let txn = index.read_txn()?; + + let fields_ids_map = index.fields_ids_map(&txn)?; + + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .as_ref() + .iter() + .filter_map(|f| fields_ids_map.id(f)) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + let internal_id = index + .external_documents_ids(&txn)? + .get(document_id.as_ref().as_bytes()) + .with_context(|| format!("Document with id {} not found", document_id.as_ref()))?; + + let document = index.documents(&txn, std::iter::once(internal_id))? + .into_iter() + .next() + .map(|(_, d)| d); + + match document { + Some(document) => Ok(obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, document)?), + None => bail!("Document with id {} not found", document_id.as_ref()), + } + } } diff --git a/src/index_controller/local_index_controller/update_store.rs b/src/index_controller/local_index_controller/update_store.rs index d4b796993..9f0c4ad7d 100644 --- a/src/index_controller/local_index_controller/update_store.rs +++ b/src/index_controller/local_index_controller/update_store.rs @@ -254,6 +254,7 @@ where /// Trying to abort an update that is currently being processed, an update /// that as already been processed or which doesn't actually exist, will /// return `None`. + #[allow(dead_code)] pub fn abort_update(&self, update_id: u64) -> heed::Result>> { let mut wtxn = self.env.write_txn()?; let key = BEU64::new(update_id); @@ -281,6 +282,7 @@ where /// Aborts all the pending updates, and not the one being currently processed. /// Returns the update metas and ids that were successfully aborted. + #[allow(dead_code)] pub fn abort_pendings(&self) -> heed::Result)>> { let mut wtxn = self.env.write_txn()?; let mut aborted_updates = Vec::new(); diff --git a/src/routes/document.rs b/src/routes/document.rs index 1114ddcb3..ac20cfff0 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -33,8 +33,8 @@ type Document = IndexMap; #[derive(Deserialize)] struct DocumentParam { - _index_uid: String, - _document_id: String, + index_uid: String, + document_id: String, } pub fn services(cfg: &mut web::ServiceConfig) { @@ -52,10 +52,21 @@ pub fn services(cfg: &mut web::ServiceConfig) { wrap = "Authentication::Public" )] async fn get_document( - _data: web::Data, - _path: web::Path, + data: web::Data, + path: web::Path, ) -> Result { - todo!() + let index = &path.index_uid; + let id = &path.document_id; + match data.retrieve_document(index, id, None) { + Ok(document) => { + let json = serde_json::to_string(&document).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(e) => { + error!("{}", e); + unimplemented!() + } + } } #[delete( From 8bb1b6146f4aa97b520aea7effe3a28dd32d3a12 Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 15 Feb 2021 23:02:20 +0100 Subject: [PATCH 3/3] make retrieval non blocking --- src/data/search.rs | 126 ++++++++++++++++++++++------------------- src/routes/document.rs | 11 ++-- 2 files changed, 75 insertions(+), 62 deletions(-) diff --git a/src/data/search.rs b/src/data/search.rs index 33e48e4fd..b4be47c35 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -166,80 +166,92 @@ impl Data { } } - pub fn retrieve_documents( + pub async fn retrieve_documents( &self, - index: impl AsRef, + index: impl AsRef + Send + Sync + 'static, offset: usize, limit: usize, - attributes_to_retrieve: Option<&[&str]>, - ) -> anyhow::Result>> { - let index = self.index_controller - .index(&index)? - .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; + attributes_to_retrieve: Option>, + ) -> anyhow::Result>> + where + S: AsRef + Send + Sync + 'static + { + let index_controller = self.index_controller.clone(); + let documents: anyhow::Result<_> = tokio::task::spawn_blocking(move || { + let index = index_controller + .index(&index)? + .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; - let txn = index.read_txn()?; + let txn = index.read_txn()?; - let fields_ids_map = index.fields_ids_map(&txn)?; + let fields_ids_map = index.fields_ids_map(&txn)?; - let attributes_to_retrieve_ids = match attributes_to_retrieve { - Some(attrs) => attrs - .as_ref() - .iter() - .filter_map(|f| fields_ids_map.id(f)) - .collect::>(), - None => fields_ids_map.iter().map(|(id, _)| id).collect(), - }; + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f.as_ref())) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; - let iter = index.documents.range(&txn, &(..))? - .skip(offset) - .take(limit); + let iter = index.documents.range(&txn, &(..))? + .skip(offset) + .take(limit); - let mut documents = Vec::new(); + let mut documents = Vec::new(); - for entry in iter { - let (_id, obkv) = entry?; - let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?; - documents.push(object); - } + for entry in iter { + let (_id, obkv) = entry?; + let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?; + documents.push(object); + } - Ok(documents) + Ok(documents) + }).await?; + documents } - pub fn retrieve_document( + pub async fn retrieve_document( &self, - index: impl AsRef, - document_id: impl AsRef, - attributes_to_retrieve: Option<&[&str]>, - ) -> anyhow::Result> { - let index = self.index_controller - .index(&index)? - .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; - let txn = index.read_txn()?; + index: impl AsRef + Sync + Send + 'static, + document_id: impl AsRef + Sync + Send + 'static, + attributes_to_retrieve: Option>, + ) -> anyhow::Result> + where + S: AsRef + Sync + Send + 'static, + { + let index_controller = self.index_controller.clone(); + let document: anyhow::Result<_> = tokio::task::spawn_blocking(move || { + let index = index_controller + .index(&index)? + .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; + let txn = index.read_txn()?; - let fields_ids_map = index.fields_ids_map(&txn)?; + let fields_ids_map = index.fields_ids_map(&txn)?; - let attributes_to_retrieve_ids = match attributes_to_retrieve { - Some(attrs) => attrs - .as_ref() - .iter() - .filter_map(|f| fields_ids_map.id(f)) - .collect::>(), - None => fields_ids_map.iter().map(|(id, _)| id).collect(), - }; + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f.as_ref())) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; - let internal_id = index - .external_documents_ids(&txn)? - .get(document_id.as_ref().as_bytes()) - .with_context(|| format!("Document with id {} not found", document_id.as_ref()))?; + let internal_id = index + .external_documents_ids(&txn)? + .get(document_id.as_ref().as_bytes()) + .with_context(|| format!("Document with id {} not found", document_id.as_ref()))?; - let document = index.documents(&txn, std::iter::once(internal_id))? - .into_iter() - .next() - .map(|(_, d)| d); + let document = index.documents(&txn, std::iter::once(internal_id))? + .into_iter() + .next() + .map(|(_, d)| d); - match document { - Some(document) => Ok(obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, document)?), - None => bail!("Document with id {} not found", document_id.as_ref()), - } + match document { + Some(document) => Ok(obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, document)?), + None => bail!("Document with id {} not found", document_id.as_ref()), + } + }).await?; + document } } diff --git a/src/routes/document.rs b/src/routes/document.rs index ac20cfff0..a240867dd 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -55,9 +55,9 @@ async fn get_document( data: web::Data, path: web::Path, ) -> Result { - let index = &path.index_uid; - let id = &path.document_id; - match data.retrieve_document(index, id, None) { + let index = path.index_uid.clone(); + let id = path.document_id.clone(); + match data.retrieve_document(index, id, None as Option>).await { Ok(document) => { let json = serde_json::to_string(&document).unwrap(); Ok(HttpResponse::Ok().body(json)) @@ -99,13 +99,14 @@ async fn get_all_documents( .as_ref() .map(|attrs| attrs .split(",") + .map(String::from) .collect::>()); match data.retrieve_documents( - &path.index_uid, + path.index_uid.clone(), params.offset.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_OFFSET), params.limit.unwrap_or(DEFAULT_RETRIEVE_DOCUMENTS_LIMIT), - attributes_to_retrieve.as_deref()) { + attributes_to_retrieve).await { Ok(docs) => { let json = serde_json::to_string(&docs).unwrap(); Ok(HttpResponse::Ok().body(json))