fix clippy warnings

This commit is contained in:
mpostma 2021-03-15 16:52:05 +01:00
parent 01479dcf99
commit abbea59732
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
17 changed files with 124 additions and 131 deletions

View File

@ -78,22 +78,22 @@ impl Data {
Ok(Data { inner }) Ok(Data { inner })
} }
pub async fn settings<S: AsRef<str>>(&self, uid: S) -> anyhow::Result<Settings> { pub async fn settings(&self, uid: String) -> anyhow::Result<Settings> {
self.index_controller.settings(uid.as_ref().to_string()).await self.index_controller.settings(uid).await
} }
pub async fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> { pub async fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
self.index_controller.list_indexes().await self.index_controller.list_indexes().await
} }
pub async fn index(&self, uid: impl AsRef<str>) -> anyhow::Result<Option<IndexMetadata>> { pub async fn index(&self, uid: String) -> anyhow::Result<IndexMetadata> {
self.index_controller.get_index(uid.as_ref().to_string()).await self.index_controller.get_index(uid).await
} }
pub async fn create_index(&self, uid: impl AsRef<str>, primary_key: Option<impl AsRef<str>>) -> anyhow::Result<IndexMetadata> { pub async fn create_index(&self, uid: String, primary_key: Option<String>) -> anyhow::Result<IndexMetadata> {
let settings = IndexSettings { let settings = IndexSettings {
uid: Some(uid.as_ref().to_string()), uid: Some(uid),
primary_key: primary_key.map(|s| s.as_ref().to_string()), primary_key,
}; };
let meta = self.index_controller.create_index(settings).await?; let meta = self.index_controller.create_index(settings).await?;

View File

@ -4,12 +4,12 @@ use crate::index::{SearchQuery, SearchResult};
use super::Data; use super::Data;
impl Data { impl Data {
pub async fn search<S: AsRef<str>>( pub async fn search(
&self, &self,
index: S, index: String,
search_query: SearchQuery, search_query: SearchQuery,
) -> anyhow::Result<SearchResult> { ) -> anyhow::Result<SearchResult> {
self.index_controller.search(index.as_ref().to_string(), search_query).await self.index_controller.search(index, search_query).await
} }
pub async fn retrieve_documents( pub async fn retrieve_documents(
@ -24,11 +24,11 @@ impl Data {
pub async fn retrieve_document( pub async fn retrieve_document(
&self, &self,
index: impl AsRef<str> + Sync + Send + 'static, index: String,
document_id: impl AsRef<str> + Sync + Send + 'static, document_id: String,
attributes_to_retrieve: Option<Vec<String>>, attributes_to_retrieve: Option<Vec<String>>,
) -> anyhow::Result<Map<String, Value>> ) -> anyhow::Result<Map<String, Value>>
{ {
self.index_controller.document(index.as_ref().to_string(), document_id.as_ref().to_string(), attributes_to_retrieve).await self.index_controller.document(index, document_id, attributes_to_retrieve).await
} }
} }

View File

@ -9,14 +9,14 @@ use super::Data;
impl Data { impl Data {
pub async fn add_documents( pub async fn add_documents(
&self, &self,
index: impl AsRef<str> + Send + Sync + 'static, index: String,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat, format: UpdateFormat,
stream: Payload, stream: Payload,
primary_key: Option<String>, primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus> ) -> anyhow::Result<UpdateStatus>
{ {
let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, stream, primary_key).await?; let update_status = self.index_controller.add_documents(index, method, format, stream, primary_key).await?;
Ok(update_status) Ok(update_status)
} }
@ -27,53 +27,53 @@ impl Data {
create: bool, create: bool,
) -> anyhow::Result<UpdateStatus> { ) -> anyhow::Result<UpdateStatus> {
let update = self.index_controller.update_settings(index, settings, create).await?; let update = self.index_controller.update_settings(index, settings, create).await?;
Ok(update.into()) Ok(update)
} }
pub async fn clear_documents( pub async fn clear_documents(
&self, &self,
index: impl AsRef<str> + Sync + Send + 'static, index: String,
) -> anyhow::Result<UpdateStatus> { ) -> anyhow::Result<UpdateStatus> {
let update = self.index_controller.clear_documents(index.as_ref().to_string()).await?; let update = self.index_controller.clear_documents(index).await?;
Ok(update) Ok(update)
} }
pub async fn delete_documents( pub async fn delete_documents(
&self, &self,
index: impl AsRef<str> + Sync + Send + 'static, index: String,
document_ids: Vec<String>, document_ids: Vec<String>,
) -> anyhow::Result<UpdateStatus> { ) -> anyhow::Result<UpdateStatus> {
let update = self.index_controller.delete_documents(index.as_ref().to_string(), document_ids).await?; let update = self.index_controller.delete_documents(index, document_ids).await?;
Ok(update.into()) Ok(update)
} }
pub async fn delete_index( pub async fn delete_index(
&self, &self,
index: impl AsRef<str> + Send + Sync + 'static, index: String,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
self.index_controller.delete_index(index.as_ref().to_owned()).await?; self.index_controller.delete_index(index).await?;
Ok(()) Ok(())
} }
pub async fn get_update_status(&self, index: impl AsRef<str>, uid: u64) -> anyhow::Result<Option<UpdateStatus>> { pub async fn get_update_status(&self, index: String, uid: u64) -> anyhow::Result<UpdateStatus> {
self.index_controller.update_status(index.as_ref().to_string(), uid).await self.index_controller.update_status(index, uid).await
} }
pub async fn get_updates_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>> { pub async fn get_updates_status(&self, index: String) -> anyhow::Result<Vec<UpdateStatus>> {
self.index_controller.all_update_status(index.as_ref().to_string()).await self.index_controller.all_update_status(index).await
} }
pub async fn update_index( pub async fn update_index(
&self, &self,
uid: impl AsRef<str>, uid: String,
primary_key: Option<impl AsRef<str>>, primary_key: Option<String>,
new_uid: Option<impl AsRef<str>> new_uid: Option<String>
) -> anyhow::Result<IndexMetadata> { ) -> anyhow::Result<IndexMetadata> {
let settings = IndexSettings { let settings = IndexSettings {
uid: new_uid.map(|s| s.as_ref().to_string()), uid: new_uid,
primary_key: primary_key.map(|s| s.as_ref().to_string()), primary_key,
}; };
self.index_controller.update_index(uid.as_ref().to_string(), settings).await self.index_controller.update_index(uid, settings).await
} }
} }

View File

@ -120,7 +120,7 @@ impl Index {
fn parse_facets_array( fn parse_facets_array(
txn: &RoTxn, txn: &RoTxn,
index: &Index, index: &Index,
arr: &Vec<Value>, arr: &[Value],
) -> anyhow::Result<Option<FacetCondition>> { ) -> anyhow::Result<Option<FacetCondition>> {
let mut ands = Vec::new(); let mut ands = Vec::new();
for value in arr { for value in arr {

View File

@ -85,11 +85,8 @@ impl Index {
let mut wtxn = self.write_txn()?; let mut wtxn = self.write_txn()?;
// Set the primary key if not set already, ignore if already set. // Set the primary key if not set already, ignore if already set.
match (self.primary_key(&wtxn)?, primary_key) { if let (None, Some(ref primary_key)) = (self.primary_key(&wtxn)?, primary_key) {
(None, Some(ref primary_key)) => { self.put_primary_key(&mut wtxn, primary_key)?;
self.put_primary_key(&mut wtxn, primary_key)?;
}
_ => (),
} }
let mut builder = update_builder.index_documents(&mut wtxn, self); let mut builder = update_builder.index_documents(&mut wtxn, self);
@ -109,13 +106,10 @@ impl Index {
info!("document addition done: {:?}", result); info!("document addition done: {:?}", result);
match result { result.and_then(|addition_result| wtxn
Ok(addition_result) => wtxn .commit()
.commit() .and(Ok(UpdateResult::DocumentsAddition(addition_result)))
.and(Ok(UpdateResult::DocumentsAddition(addition_result))) .map_err(Into::into))
.map_err(Into::into),
Err(e) => Err(e.into()),
}
} }
pub fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result<UpdateResult> { pub fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result<UpdateResult> {
@ -128,7 +122,7 @@ impl Index {
.commit() .commit()
.and(Ok(UpdateResult::Other)) .and(Ok(UpdateResult::Other))
.map_err(Into::into), .map_err(Into::into),
Err(e) => Err(e.into()), Err(e) => Err(e),
} }
} }
@ -159,7 +153,7 @@ impl Index {
// We transpose the settings JSON struct into a real setting update. // We transpose the settings JSON struct into a real setting update.
if let Some(ref facet_types) = settings.attributes_for_faceting { if let Some(ref facet_types) = settings.attributes_for_faceting {
let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new()); let facet_types = facet_types.clone().unwrap_or_else(HashMap::new);
builder.set_faceted_fields(facet_types); builder.set_faceted_fields(facet_types);
} }
@ -179,7 +173,7 @@ impl Index {
.commit() .commit()
.and(Ok(UpdateResult::Other)) .and(Ok(UpdateResult::Other))
.map_err(Into::into), .map_err(Into::into),
Err(e) => Err(e.into()), Err(e) => Err(e),
} }
} }
@ -202,7 +196,7 @@ impl Index {
.commit() .commit()
.and(Ok(UpdateResult::Other)) .and(Ok(UpdateResult::Other))
.map_err(Into::into), .map_err(Into::into),
Err(e) => Err(e.into()), Err(e) => Err(e),
} }
} }
@ -223,7 +217,7 @@ impl Index {
.commit() .commit()
.and(Ok(UpdateResult::DocumentDeletion { deleted })) .and(Ok(UpdateResult::DocumentDeletion { deleted }))
.map_err(Into::into), .map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e)
} }
} }
} }

View File

@ -92,7 +92,7 @@ enum IndexMsg {
}, },
GetMeta { GetMeta {
uuid: Uuid, uuid: Uuid,
ret: oneshot::Sender<Result<Option<IndexMeta>>>, ret: oneshot::Sender<Result<IndexMeta>>,
}, },
UpdateIndex { UpdateIndex {
uuid: Uuid, uuid: Uuid,
@ -137,7 +137,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
) -> Result<Self> { ) -> Result<Self> {
let options = IndexerOpts::default(); let options = IndexerOpts::default();
let update_handler = let update_handler =
UpdateHandler::new(&options).map_err(|e| IndexError::Error(e.into()))?; UpdateHandler::new(&options).map_err(IndexError::Error)?;
let update_handler = Arc::new(update_handler); let update_handler = Arc::new(update_handler);
let read_receiver = Some(read_receiver); let read_receiver = Some(read_receiver);
let write_receiver = Some(write_receiver); let write_receiver = Some(write_receiver);
@ -274,11 +274,11 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
data: File, data: File,
) -> Result<UpdateResult> { ) -> Result<UpdateResult> {
debug!("Processing update {}", meta.id()); debug!("Processing update {}", meta.id());
let uuid = meta.index_uuid().clone(); let uuid = meta.index_uuid();
let update_handler = self.update_handler.clone(); let update_handler = self.update_handler.clone();
let index = match self.store.get(uuid.clone()).await? { let index = match self.store.get(*uuid).await? {
Some(index) => index, Some(index) => index,
None => self.store.create(uuid, None).await?, None => self.store.create(*uuid, None).await?,
}; };
spawn_blocking(move || update_handler.handle_update(meta, data, index)) spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await .await
@ -291,7 +291,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid) .get(uuid)
.await? .await?
.ok_or(IndexError::UnexistingIndex)?; .ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || index.settings().map_err(|e| IndexError::Error(e))) spawn_blocking(move || index.settings().map_err(IndexError::Error))
.await .await
.map_err(|e| IndexError::Error(e.into()))? .map_err(|e| IndexError::Error(e.into()))?
} }
@ -311,7 +311,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
spawn_blocking(move || { spawn_blocking(move || {
index index
.retrieve_documents(offset, limit, attributes_to_retrieve) .retrieve_documents(offset, limit, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e)) .map_err(IndexError::Error)
}) })
.await .await
.map_err(|e| IndexError::Error(e.into()))? .map_err(|e| IndexError::Error(e.into()))?
@ -331,7 +331,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
spawn_blocking(move || { spawn_blocking(move || {
index index
.retrieve_document(doc_id, attributes_to_retrieve) .retrieve_document(doc_id, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e)) .map_err(IndexError::Error)
}) })
.await .await
.map_err(|e| IndexError::Error(e.into()))? .map_err(|e| IndexError::Error(e.into()))?
@ -354,15 +354,15 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(()) Ok(())
} }
async fn handle_get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> { async fn handle_get_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
match self.store.get(uuid).await? { match self.store.get(uuid).await? {
Some(index) => { Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index)) let meta = spawn_blocking(move || IndexMeta::new(&index))
.await .await
.map_err(|e| IndexError::Error(e.into()))??; .map_err(|e| IndexError::Error(e.into()))??;
Ok(Some(meta)) Ok(meta)
} }
None => Ok(None), None => Err(IndexError::UnexistingIndex),
} }
} }
@ -405,7 +405,7 @@ impl IndexActorHandle {
let (read_sender, read_receiver) = mpsc::channel(100); let (read_sender, read_receiver) = mpsc::channel(100);
let (write_sender, write_receiver) = mpsc::channel(100); let (write_sender, write_receiver) = mpsc::channel(100);
let store = HeedIndexStore::new(path, index_size)?; let store = HeedIndexStore::new(path, index_size);
let actor = IndexActor::new(read_receiver, write_receiver, store)?; let actor = IndexActor::new(read_receiver, write_receiver, store)?;
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
Ok(Self { Ok(Self {
@ -492,7 +492,7 @@ impl IndexActorHandle {
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
pub async fn get_index_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> { pub async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetMeta { uuid, ret }; let msg = IndexMsg::GetMeta { uuid, ret };
let _ = self.read_sender.send(msg).await; let _ = self.read_sender.send(msg).await;
@ -518,14 +518,14 @@ struct HeedIndexStore {
} }
impl HeedIndexStore { impl HeedIndexStore {
fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> { fn new(path: impl AsRef<Path>, index_size: usize) -> Self {
let path = path.as_ref().join("indexes/"); let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new())); let index_store = Arc::new(RwLock::new(HashMap::new()));
Ok(Self { Self {
index_store, index_store,
path, path,
index_size, index_size,
}) }
} }
} }
@ -550,7 +550,7 @@ impl IndexStore for HeedIndexStore {
.await .await
.map_err(|e| IndexError::Error(e.into()))??; .map_err(|e| IndexError::Error(e.into()))??;
self.index_store.write().await.insert(uuid.clone(), index.clone()); self.index_store.write().await.insert(uuid, index.clone());
Ok(index) Ok(index)
} }
@ -574,7 +574,7 @@ impl IndexStore for HeedIndexStore {
self.index_store self.index_store
.write() .write()
.await .await
.insert(uuid.clone(), index.clone()); .insert(uuid, index.clone());
Ok(Some(index)) Ok(Some(index))
} }
} }
@ -595,6 +595,6 @@ fn open_index(path: impl AsRef<Path>, size: usize) -> Result<Index> {
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(size); options.map_size(size);
let index = milli::Index::new(options, &path) let index = milli::Index::new(options, &path)
.map_err(|e| IndexError::Error(e))?; .map_err(IndexError::Error)?;
Ok(Index(Arc::new(index))) Ok(Index(Arc::new(index)))
} }

View File

@ -127,7 +127,7 @@ impl IndexController {
// the update is processed. This would make calls to GET index to fail until the update // the update is processed. This would make calls to GET index to fail until the update
// is complete. Since this is get or create, we ignore the error when the index already // is complete. Since this is get or create, we ignore the error when the index already
// exists. // exists.
match self.index_handle.create_index(uuid.clone(), None).await { match self.index_handle.create_index(uuid, None).await {
Ok(_) | Err(index_actor::IndexError::IndexAlreadyExists) => (), Ok(_) | Err(index_actor::IndexError::IndexAlreadyExists) => (),
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
} }
@ -158,12 +158,12 @@ impl IndexController {
let uuid = self.uuid_resolver let uuid = self.uuid_resolver
.delete(uid) .delete(uid)
.await?; .await?;
self.update_handle.delete(uuid.clone()).await?; self.update_handle.delete(uuid).await?;
self.index_handle.delete(uuid).await?; self.index_handle.delete(uuid).await?;
Ok(()) Ok(())
} }
pub async fn update_status(&self, uid: String, id: u64) -> anyhow::Result<Option<UpdateStatus>> { pub async fn update_status(&self, uid: String, id: u64) -> anyhow::Result<UpdateStatus> {
let uuid = self.uuid_resolver let uuid = self.uuid_resolver
.resolve(uid) .resolve(uid)
.await?; .await?;
@ -184,10 +184,9 @@ impl IndexController {
let mut ret = Vec::new(); let mut ret = Vec::new();
for (uid, uuid) in uuids { for (uid, uuid) in uuids {
if let Some(meta) = self.index_handle.get_index_meta(uuid).await? { let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata { uid, meta }; let meta = IndexMetadata { uid, meta };
ret.push(meta); ret.push(meta);
}
} }
Ok(ret) Ok(ret)
@ -247,13 +246,13 @@ impl IndexController {
Ok(result) Ok(result)
} }
pub async fn get_index(&self, uid: String) -> anyhow::Result<Option<IndexMetadata>> { pub async fn get_index(&self, uid: String) -> anyhow::Result<IndexMetadata> {
let uuid = self.uuid_resolver.resolve(uid.clone()).await?; let uuid = self.uuid_resolver.resolve(uid.clone()).await?;
let result = self.index_handle let meta = self.index_handle
.get_index_meta(uuid) .get_index_meta(uuid)
.await? .await?;
.map(|meta| IndexMetadata { uid, meta }); let meta = IndexMetadata { uid, meta };
Ok(result) Ok(meta)
} }
} }

View File

@ -25,6 +25,8 @@ pub enum UpdateError {
Error(Box<dyn std::error::Error + Sync + Send + 'static>), Error(Box<dyn std::error::Error + Sync + Send + 'static>),
#[error("Index {0} doesn't exist.")] #[error("Index {0} doesn't exist.")]
UnexistingIndex(Uuid), UnexistingIndex(Uuid),
#[error("Update {0} doesn't exist.")]
UnexistingUpdate(u64),
} }
enum UpdateMsg<D> { enum UpdateMsg<D> {
@ -40,7 +42,7 @@ enum UpdateMsg<D> {
}, },
GetUpdate { GetUpdate {
uuid: Uuid, uuid: Uuid,
ret: oneshot::Sender<Result<Option<UpdateStatus>>>, ret: oneshot::Sender<Result<UpdateStatus>>,
id: u64, id: u64,
}, },
Delete { Delete {
@ -62,8 +64,8 @@ struct UpdateActor<D, S> {
#[async_trait::async_trait] #[async_trait::async_trait]
trait UpdateStoreStore { trait UpdateStoreStore {
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>; async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>;
async fn delete(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>>; async fn delete(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>>;
async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>>; async fn get(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>>;
} }
impl<D, S> UpdateActor<D, S> impl<D, S> UpdateActor<D, S>
@ -145,18 +147,17 @@ where
.map_err(|e| UpdateError::Error(Box::new(e)))?; .map_err(|e| UpdateError::Error(Box::new(e)))?;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = update_store update_store
.register_update(meta, path, uuid) .register_update(meta, path, uuid)
.map(|pending| UpdateStatus::Pending(pending)) .map(UpdateStatus::Pending)
.map_err(|e| UpdateError::Error(Box::new(e))); .map_err(|e| UpdateError::Error(Box::new(e)))
result
}) })
.await .await
.map_err(|e| UpdateError::Error(Box::new(e)))? .map_err(|e| UpdateError::Error(Box::new(e)))?
} }
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> { async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.get(&uuid).await?; let update_store = self.store.get(uuid).await?;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = update_store let result = update_store
.ok_or(UpdateError::UnexistingIndex(uuid))? .ok_or(UpdateError::UnexistingIndex(uuid))?
@ -168,20 +169,21 @@ where
.map_err(|e| UpdateError::Error(Box::new(e)))? .map_err(|e| UpdateError::Error(Box::new(e)))?
} }
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<Option<UpdateStatus>> { async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let store = self let store = self
.store .store
.get(&uuid) .get(uuid)
.await? .await?
.ok_or(UpdateError::UnexistingIndex(uuid))?; .ok_or(UpdateError::UnexistingIndex(uuid))?;
let result = store let result = store
.meta(id) .meta(id)
.map_err(|e| UpdateError::Error(Box::new(e)))?; .map_err(|e| UpdateError::Error(Box::new(e)))?
.ok_or(UpdateError::UnexistingUpdate(id))?;
Ok(result) Ok(result)
} }
async fn handle_delete(&self, uuid: Uuid) -> Result<()> { async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store.delete(&uuid).await?; let store = self.store.delete(uuid).await?;
if let Some(store) = store { if let Some(store) = store {
tokio::task::spawn(async move { tokio::task::spawn(async move {
@ -246,7 +248,7 @@ where
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }
pub async fn update_status(&self, uuid: Uuid, id: u64) -> Result<Option<UpdateStatus>> { pub async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetUpdate { uuid, id, ret }; let msg = UpdateMsg::GetUpdate { uuid, id, ret };
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
@ -310,9 +312,9 @@ impl UpdateStoreStore for MapUpdateStoreStore {
} }
} }
async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> { async fn get(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>> {
let guard = self.db.read().await; let guard = self.db.read().await;
match guard.get(uuid) { match guard.get(&uuid) {
Some(uuid) => Ok(Some(uuid.clone())), Some(uuid) => Ok(Some(uuid.clone())),
None => { None => {
// The index is not found in the found in the loaded indexes, so we attempt to load // The index is not found in the found in the loaded indexes, so we attempt to load
@ -322,7 +324,7 @@ impl UpdateStoreStore for MapUpdateStoreStore {
let path = self.path.clone().join(format!("updates-{}", uuid)); let path = self.path.clone().join(format!("updates-{}", uuid));
if path.exists() { if path.exists() {
let mut guard = self.db.write().await; let mut guard = self.db.write().await;
match guard.entry(uuid.clone()) { match guard.entry(uuid) {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
// We can safely load the index // We can safely load the index
let index_handle = self.index_handle.clone(); let index_handle = self.index_handle.clone();
@ -333,7 +335,7 @@ impl UpdateStoreStore for MapUpdateStoreStore {
futures::executor::block_on(index_handle.update(meta, file)) futures::executor::block_on(index_handle.update(meta, file))
}) })
.map_err(|e| UpdateError::Error(e.into()))?; .map_err(|e| UpdateError::Error(e.into()))?;
let store = entry.insert(store.clone()); let store = entry.insert(store);
Ok(Some(store.clone())) Ok(Some(store.clone()))
} }
Entry::Occupied(entry) => { Entry::Occupied(entry) => {
@ -348,7 +350,7 @@ impl UpdateStoreStore for MapUpdateStoreStore {
} }
} }
async fn delete(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> { async fn delete(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>> {
let store = self.db.write().await.remove(&uuid); let store = self.db.write().await.remove(&uuid);
let path = self.path.clone().join(format!("updates-{}", uuid)); let path = self.path.clone().join(format!("updates-{}", uuid));
if store.is_some() || path.exists() { if store.is_some() || path.exists() {

View File

@ -92,7 +92,7 @@ where
let update_store_weak = Arc::downgrade(&update_store); let update_store_weak = Arc::downgrade(&update_store);
tokio::task::spawn(async move { tokio::task::spawn(async move {
// Block and wait for something to process. // Block and wait for something to process.
'outer: while let Some(_) = notification_receiver.recv().await { 'outer: while notification_receiver.recv().await.is_some() {
loop { loop {
match update_store_weak.upgrade() { match update_store_weak.upgrade() {
Some(update_store) => { Some(update_store) => {
@ -276,7 +276,7 @@ where
updates.extend(failed); updates.extend(failed);
updates.sort_unstable_by(|a, b| a.id().cmp(&b.id())); updates.sort_by_key(|u| u.id());
Ok(updates) Ok(updates)
} }

View File

@ -216,7 +216,7 @@ impl HeedUuidStore {
impl UuidStore for HeedUuidStore { impl UuidStore for HeedUuidStore {
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> { async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone(); let env = self.env.clone();
let db = self.db.clone(); let db = self.db;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?; let mut txn = env.write_txn()?;
match db.get(&txn, &name)? { match db.get(&txn, &name)? {
@ -240,7 +240,7 @@ impl UuidStore for HeedUuidStore {
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> { async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone(); let env = self.env.clone();
let db = self.db.clone(); let db = self.db;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?; let txn = env.read_txn()?;
match db.get(&txn, &name)? { match db.get(&txn, &name)? {
@ -255,7 +255,7 @@ impl UuidStore for HeedUuidStore {
async fn delete(&self, uid: String) -> Result<Option<Uuid>> { async fn delete(&self, uid: String) -> Result<Option<Uuid>> {
let env = self.env.clone(); let env = self.env.clone();
let db = self.db.clone(); let db = self.db;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?; let mut txn = env.write_txn()?;
match db.get(&txn, &uid)? { match db.get(&txn, &uid)? {
@ -272,7 +272,7 @@ impl UuidStore for HeedUuidStore {
async fn list(&self) -> Result<Vec<(String, Uuid)>> { async fn list(&self) -> Result<Vec<(String, Uuid)>> {
let env = self.env.clone(); let env = self.env.clone();
let db = self.db.clone(); let db = self.db;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?; let txn = env.read_txn()?;
let mut entries = Vec::new(); let mut entries = Vec::new();

View File

@ -105,7 +105,7 @@ async fn get_all_documents(
.attributes_to_retrieve .attributes_to_retrieve
.as_ref() .as_ref()
.map(|attrs| attrs .map(|attrs| attrs
.split(",") .split(',')
.map(String::from) .map(String::from)
.collect::<Vec<_>>()); .collect::<Vec<_>>());

View File

@ -37,13 +37,12 @@ async fn get_index(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
match data.index(&path.index_uid).await? { match data.index(path.index_uid.clone()).await {
Some(meta) => { Ok(meta) => {
let json = serde_json::to_string(&meta).unwrap(); let json = serde_json::to_string(&meta).unwrap();
Ok(HttpResponse::Ok().body(json)) Ok(HttpResponse::Ok().body(json))
} }
None => { Err(e) => {
let e = format!("Index {:?} doesn't exist.", path.index_uid);
Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
} }
} }
@ -61,7 +60,8 @@ async fn create_index(
data: web::Data<Data>, data: web::Data<Data>,
body: web::Json<IndexCreateRequest>, body: web::Json<IndexCreateRequest>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
match data.create_index(&body.uid, body.primary_key.clone()).await { let body = body.into_inner();
match data.create_index(body.uid, body.primary_key).await {
Ok(meta) => { Ok(meta) => {
let json = serde_json::to_string(&meta).unwrap(); let json = serde_json::to_string(&meta).unwrap();
Ok(HttpResponse::Ok().body(json)) Ok(HttpResponse::Ok().body(json))
@ -95,7 +95,8 @@ async fn update_index(
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
body: web::Json<UpdateIndexRequest>, body: web::Json<UpdateIndexRequest>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
match data.update_index(&path.index_uid, body.primary_key.as_ref(), body.uid.as_ref()).await { let body = body.into_inner();
match data.update_index(path.into_inner().index_uid, body.primary_key, body.uid).await {
Ok(meta) => { Ok(meta) => {
let json = serde_json::to_string(&meta).unwrap(); let json = serde_json::to_string(&meta).unwrap();
Ok(HttpResponse::Ok().body(json)) Ok(HttpResponse::Ok().body(json))
@ -133,16 +134,13 @@ async fn get_update_status(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<UpdateParam>, path: web::Path<UpdateParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let result = data.get_update_status(&path.index_uid, path.update_id).await; let params = path.into_inner();
let result = data.get_update_status(params.index_uid, params.update_id).await;
match result { match result {
Ok(Some(meta)) => { Ok(meta) => {
let json = serde_json::to_string(&meta).unwrap(); let json = serde_json::to_string(&meta).unwrap();
Ok(HttpResponse::Ok().body(json)) Ok(HttpResponse::Ok().body(json))
} }
Ok(None) => {
let e = format!("update {} for index {:?} doesn't exists.", path.update_id, path.index_uid);
Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
}
Err(e) => { Err(e) => {
Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
} }
@ -154,7 +152,7 @@ async fn get_all_updates_status(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let result = data.get_updates_status(&path.index_uid).await; let result = data.get_updates_status(path.into_inner().index_uid).await;
match result { match result {
Ok(metas) => { Ok(metas) => {
let json = serde_json::to_string(&metas).unwrap(); let json = serde_json::to_string(&metas).unwrap();

View File

@ -36,19 +36,19 @@ impl TryFrom<SearchQueryGet> for SearchQuery {
fn try_from(other: SearchQueryGet) -> anyhow::Result<Self> { fn try_from(other: SearchQueryGet) -> anyhow::Result<Self> {
let attributes_to_retrieve = other let attributes_to_retrieve = other
.attributes_to_retrieve .attributes_to_retrieve
.map(|attrs| attrs.split(",").map(String::from).collect::<Vec<_>>()); .map(|attrs| attrs.split(',').map(String::from).collect::<Vec<_>>());
let attributes_to_crop = other let attributes_to_crop = other
.attributes_to_crop .attributes_to_crop
.map(|attrs| attrs.split(",").map(String::from).collect::<Vec<_>>()); .map(|attrs| attrs.split(',').map(String::from).collect::<Vec<_>>());
let attributes_to_highlight = other let attributes_to_highlight = other
.attributes_to_highlight .attributes_to_highlight
.map(|attrs| attrs.split(",").map(String::from).collect::<HashSet<_>>()); .map(|attrs| attrs.split(',').map(String::from).collect::<HashSet<_>>());
let facet_distributions = other let facet_distributions = other
.facet_distributions .facet_distributions
.map(|attrs| attrs.split(",").map(String::from).collect::<Vec<_>>()); .map(|attrs| attrs.split(',').map(String::from).collect::<Vec<_>>());
let facet_filters = match other.facet_filters { let facet_filters = match other.facet_filters {
Some(ref f) => Some(serde_json::from_str(f)?), Some(ref f) => Some(serde_json::from_str(f)?),
@ -83,7 +83,7 @@ async fn search_with_url_query(
return Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) return Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
} }
}; };
let search_result = data.search(&path.index_uid, query).await; let search_result = data.search(path.into_inner().index_uid, query).await;
match search_result { match search_result {
Ok(docs) => { Ok(docs) => {
let docs = serde_json::to_string(&docs).unwrap(); let docs = serde_json::to_string(&docs).unwrap();
@ -101,7 +101,7 @@ async fn search_with_post(
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
params: web::Json<SearchQuery>, params: web::Json<SearchQuery>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let search_result = data.search(&path.index_uid, params.into_inner()).await; let search_result = data.search(path.into_inner().index_uid, params.into_inner()).await;
match search_result { match search_result {
Ok(docs) => { Ok(docs) => {
let docs = serde_json::to_string(&docs).unwrap(); let docs = serde_json::to_string(&docs).unwrap();

View File

@ -64,7 +64,7 @@ macro_rules! make_setting_route {
data: actix_web::web::Data<data::Data>, data: actix_web::web::Data<data::Data>,
index_uid: actix_web::web::Path<String>, index_uid: actix_web::web::Path<String>,
) -> std::result::Result<HttpResponse, ResponseError> { ) -> std::result::Result<HttpResponse, ResponseError> {
match data.settings(index_uid.as_ref()).await { match data.settings(index_uid.into_inner()).await {
Ok(settings) => { Ok(settings) => {
let setting = settings.$attr; let setting = settings.$attr;
let json = serde_json::to_string(&setting).unwrap(); let json = serde_json::to_string(&setting).unwrap();
@ -153,7 +153,7 @@ async fn get_all(
data: web::Data<Data>, data: web::Data<Data>,
index_uid: web::Path<String>, index_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
match data.settings(index_uid.as_ref()).await { match data.settings(index_uid.into_inner()).await {
Ok(settings) => { Ok(settings) => {
let json = serde_json::to_string(&settings).unwrap(); let json = serde_json::to_string(&settings).unwrap();
Ok(HttpResponse::Ok().body(json)) Ok(HttpResponse::Ok().body(json))

View File

@ -69,5 +69,5 @@ async fn test_create_multiple_indexes() {
assert_eq!(index1.get().await.1, 200); assert_eq!(index1.get().await.1, 200);
assert_eq!(index2.get().await.1, 200); assert_eq!(index2.get().await.1, 200);
assert_eq!(index3.get().await.1, 200); assert_eq!(index3.get().await.1, 200);
assert_eq!(index4.get().await.1, 404); assert_eq!(index4.get().await.1, 400);
} }

View File

@ -14,7 +14,7 @@ async fn create_and_delete_index() {
assert_eq!(code, 200); assert_eq!(code, 200);
assert_eq!(index.get().await.1, 404); assert_eq!(index.get().await.1, 400);
} }
#[actix_rt::test] #[actix_rt::test]

View File

@ -29,7 +29,7 @@ async fn get_unexisting_index() {
let (_response, code) = index.get().await; let (_response, code) = index.get().await;
assert_eq!(code, 404); assert_eq!(code, 400);
} }
#[actix_rt::test] #[actix_rt::test]