implement retrieve one document

This commit is contained in:
mpostma 2021-03-04 15:09:00 +01:00
parent f3d65ec5e9
commit 581dcd5735
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
5 changed files with 109 additions and 58 deletions

View File

@ -22,52 +22,13 @@ impl Data {
self.index_controller.documents(index, offset, limit, attributes_to_retrieve).await self.index_controller.documents(index, offset, limit, attributes_to_retrieve).await
} }
pub async fn retrieve_document<S>( pub async fn retrieve_document(
&self, &self,
_index: impl AsRef<str> + Sync + Send + 'static, index: impl AsRef<str> + Sync + Send + 'static,
_document_id: impl AsRef<str> + Sync + Send + 'static, document_id: impl AsRef<str> + Sync + Send + 'static,
_attributes_to_retrieve: Option<Vec<String>>, attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Map<String, Value>> ) -> anyhow::Result<Map<String, Value>>
{ {
todo!() self.index_controller.document(index.as_ref().to_string(), document_id.as_ref().to_string(), attributes_to_retrieve).await
//let index_controller = self.index_controller.clone();
//let document: anyhow::Result<_> = tokio::task::spawn_blocking(move || {
//let index = index_controller
//.index(&index)?
//.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
//let txn = index.read_txn()?;
//let fields_ids_map = index.fields_ids_map(&txn)?;
//let attributes_to_retrieve_ids = match attributes_to_retrieve {
//Some(attrs) => attrs
//.iter()
//.filter_map(|f| fields_ids_map.id(f.as_ref()))
//.collect::<Vec<_>>(),
//None => fields_ids_map.iter().map(|(id, _)| id).collect(),
//};
//let internal_id = index
//.external_documents_ids(&txn)?
//.get(document_id.as_ref().as_bytes())
//.with_context(|| format!("Document with id {} not found", document_id.as_ref()))?;
//let document = index
//.documents(&txn, std::iter::once(internal_id))?
//.into_iter()
//.next()
//.map(|(_, d)| d);
//match document {
//Some(document) => Ok(obkv_to_json(
//&attributes_to_retrieve_ids,
//&fields_ids_map,
//document,
//)?),
//None => bail!("Document with id {} not found", document_id.as_ref()),
//}
//})
//.await?;
//document
} }
} }

View File

@ -4,6 +4,7 @@ mod updates;
use std::sync::Arc; use std::sync::Arc;
use std::ops::Deref; use std::ops::Deref;
use anyhow::{bail, Context};
use serde_json::{Value, Map}; use serde_json::{Value, Map};
use milli::obkv_to_json; use milli::obkv_to_json;
@ -84,4 +85,42 @@ impl Index {
Ok(documents) Ok(documents)
} }
pub fn retrieve_document<S: AsRef<str>>(
&self,
doc_id: String,
attributes_to_retrieve: Option<Vec<S>>,
) -> anyhow::Result<Map<String, Value>> {
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f.as_ref()))
.collect::<Vec<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
let internal_id = self
.external_documents_ids(&txn)?
.get(doc_id.as_bytes())
.with_context(|| format!("Document with id {} not found", doc_id))?;
let document = self
.documents(&txn, std::iter::once(internal_id))?
.into_iter()
.next()
.map(|(_, d)| d);
match document {
Some(document) => Ok(obkv_to_json(
&attributes_to_retrieve_ids,
&fields_ids_map,
document,
)?),
None => bail!("Document with id {} not found", doc_id),
}
}
} }

View File

@ -8,7 +8,6 @@ use chrono::Utc;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::info; use log::info;
use serde_json::{Map, Value};
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
@ -51,8 +50,14 @@ enum IndexMsg {
attributes_to_retrieve: Option<Vec<String>>, attributes_to_retrieve: Option<Vec<String>>,
offset: usize, offset: usize,
limit: usize, limit: usize,
ret: oneshot::Sender<Result<Vec<Map<String, Value>>>>, ret: oneshot::Sender<Result<Vec<Document>>>,
}, },
Document {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
doc_id: String,
ret: oneshot::Sender<Result<Document>>,
}
} }
struct IndexActor<S> { struct IndexActor<S> {
@ -126,6 +131,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret) self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret)
.await .await
} }
Document { uuid, attributes_to_retrieve, doc_id, ret } => self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret).await,
} }
}); });
@ -196,6 +202,21 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
let _ = ret.send(result); let _ = ret.send(result);
}).await; }).await;
} }
async fn handle_fetch_document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
ret: oneshot::Sender<Result<Document>>,
) {
let index = self.store.get(uuid).await.unwrap().unwrap();
tokio::task::spawn_blocking(move || {
let result = index.retrieve_document(doc_id, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e));
let _ = ret.send(result);
}).await;
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -267,6 +288,23 @@ impl IndexActorHandle {
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
pub async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Document {
uuid,
ret,
doc_id,
attributes_to_retrieve,
};
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
} }
struct MapIndexStore { struct MapIndexStore {

View File

@ -172,6 +172,20 @@ impl IndexController {
Ok(documents) Ok(documents)
} }
pub async fn document(
&self,
index: String,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Document> {
let uuid = self.uuid_resolver
.resolve(index.clone())
.await?
.with_context(|| format!("Index {:?} doesn't exist", index))?;
let document = self.index_handle.document(uuid, doc_id, attributes_to_retrieve).await?;
Ok(document)
}
fn update_index(&self, name: String, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> { fn update_index(&self, name: String, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
todo!() todo!()
} }

View File

@ -55,18 +55,17 @@ async fn get_document(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<DocumentParam>, path: web::Path<DocumentParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
todo!() let index = path.index_uid.clone();
//let index = path.index_uid.clone(); let id = path.document_id.clone();
//let id = path.document_id.clone(); match data.retrieve_document(index, id, None as Option<Vec<String>>).await {
//match data.retrieve_document(index, id, None as Option<Vec<String>>).await { Ok(document) => {
//Ok(document) => { let json = serde_json::to_string(&document).unwrap();
//let json = serde_json::to_string(&document).unwrap(); Ok(HttpResponse::Ok().body(json))
//Ok(HttpResponse::Ok().body(json)) }
//} Err(e) => {
//Err(e) => { Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
//Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) }
//} }
//}
} }
#[delete( #[delete(