diff --git a/src/data/search.rs b/src/data/search.rs index 319f8b973..753083b7e 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -22,52 +22,13 @@ impl Data { self.index_controller.documents(index, offset, limit, attributes_to_retrieve).await } - pub async fn retrieve_document( + pub async fn retrieve_document( &self, - _index: impl AsRef + Sync + Send + 'static, - _document_id: impl AsRef + Sync + Send + 'static, - _attributes_to_retrieve: Option>, + index: impl AsRef + Sync + Send + 'static, + document_id: impl AsRef + Sync + Send + 'static, + attributes_to_retrieve: Option>, ) -> anyhow::Result> { - todo!() - //let index_controller = self.index_controller.clone(); - //let document: anyhow::Result<_> = tokio::task::spawn_blocking(move || { - //let index = index_controller - //.index(&index)? - //.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; - //let txn = index.read_txn()?; - - //let fields_ids_map = index.fields_ids_map(&txn)?; - - //let attributes_to_retrieve_ids = match attributes_to_retrieve { - //Some(attrs) => attrs - //.iter() - //.filter_map(|f| fields_ids_map.id(f.as_ref())) - //.collect::>(), - //None => fields_ids_map.iter().map(|(id, _)| id).collect(), - //}; - - //let internal_id = index - //.external_documents_ids(&txn)? - //.get(document_id.as_ref().as_bytes()) - //.with_context(|| format!("Document with id {} not found", document_id.as_ref()))?; - - //let document = index - //.documents(&txn, std::iter::once(internal_id))? - //.into_iter() - //.next() - //.map(|(_, d)| d); - - //match document { - //Some(document) => Ok(obkv_to_json( - //&attributes_to_retrieve_ids, - //&fields_ids_map, - //document, - //)?), - //None => bail!("Document with id {} not found", document_id.as_ref()), - //} - //}) - //.await?; - //document + self.index_controller.document(index.as_ref().to_string(), document_id.as_ref().to_string(), attributes_to_retrieve).await } } diff --git a/src/index/mod.rs b/src/index/mod.rs index 8ae2ba6a2..c50c2873c 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -4,6 +4,7 @@ mod updates; use std::sync::Arc; use std::ops::Deref; +use anyhow::{bail, Context}; use serde_json::{Value, Map}; use milli::obkv_to_json; @@ -84,4 +85,42 @@ impl Index { Ok(documents) } + + pub fn retrieve_document>( + &self, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> anyhow::Result> { + let txn = self.read_txn()?; + + let fields_ids_map = self.fields_ids_map(&txn)?; + + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f.as_ref())) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + let internal_id = self + .external_documents_ids(&txn)? + .get(doc_id.as_bytes()) + .with_context(|| format!("Document with id {} not found", doc_id))?; + + let document = self + .documents(&txn, std::iter::once(internal_id))? + .into_iter() + .next() + .map(|(_, d)| d); + + match document { + Some(document) => Ok(obkv_to_json( + &attributes_to_retrieve_ids, + &fields_ids_map, + document, + )?), + None => bail!("Document with id {} not found", doc_id), + } + } } diff --git a/src/index_controller/index_actor.rs b/src/index_controller/index_actor.rs index 8e59227ac..59f000575 100644 --- a/src/index_controller/index_actor.rs +++ b/src/index_controller/index_actor.rs @@ -8,7 +8,6 @@ use chrono::Utc; use futures::stream::StreamExt; use heed::EnvOpenOptions; use log::info; -use serde_json::{Map, Value}; use thiserror::Error; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; @@ -51,8 +50,14 @@ enum IndexMsg { attributes_to_retrieve: Option>, offset: usize, limit: usize, - ret: oneshot::Sender>>>, + ret: oneshot::Sender>>, }, + Document { + uuid: Uuid, + attributes_to_retrieve: Option>, + doc_id: String, + ret: oneshot::Sender>, + } } struct IndexActor { @@ -126,6 +131,7 @@ impl IndexActor { self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret) .await } + Document { uuid, attributes_to_retrieve, doc_id, ret } => self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret).await, } }); @@ -196,6 +202,21 @@ impl IndexActor { let _ = ret.send(result); }).await; } + + async fn handle_fetch_document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ret: oneshot::Sender>, + ) { + let index = self.store.get(uuid).await.unwrap().unwrap(); + tokio::task::spawn_blocking(move || { + let result = index.retrieve_document(doc_id, attributes_to_retrieve) + .map_err(|e| IndexError::Error(e)); + let _ = ret.send(result); + }).await; + } } #[derive(Clone)] @@ -267,6 +288,23 @@ impl IndexActorHandle { let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } + + pub async fn document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Document { + uuid, + ret, + doc_id, + attributes_to_retrieve, + }; + let _ = self.sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } } struct MapIndexStore { diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 47ce04b96..dabf1b936 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -172,6 +172,20 @@ impl IndexController { Ok(documents) } + pub async fn document( + &self, + index: String, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> anyhow::Result { + let uuid = self.uuid_resolver + .resolve(index.clone()) + .await? + .with_context(|| format!("Index {:?} doesn't exist", index))?; + let document = self.index_handle.document(uuid, doc_id, attributes_to_retrieve).await?; + Ok(document) + } + fn update_index(&self, name: String, index_settings: IndexSettings) -> anyhow::Result { todo!() } diff --git a/src/routes/document.rs b/src/routes/document.rs index af9efc701..00d037359 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -55,18 +55,17 @@ async fn get_document( data: web::Data, path: web::Path, ) -> Result { - todo!() - //let index = path.index_uid.clone(); - //let id = path.document_id.clone(); - //match data.retrieve_document(index, id, None as Option>).await { - //Ok(document) => { - //let json = serde_json::to_string(&document).unwrap(); - //Ok(HttpResponse::Ok().body(json)) - //} - //Err(e) => { - //Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) - //} - //} + let index = path.index_uid.clone(); + let id = path.document_id.clone(); + match data.retrieve_document(index, id, None as Option>).await { + Ok(document) => { + let json = serde_json::to_string(&document).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(e) => { + Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) + } + } } #[delete(