implement index delete

This commit is contained in:
mpostma 2021-03-06 12:57:56 +01:00
parent 86211b1ddd
commit d9254c4355
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
5 changed files with 171 additions and 30 deletions

View File

@ -51,12 +51,10 @@ impl Data {
pub async fn delete_index( pub async fn delete_index(
&self, &self,
_index: impl AsRef<str> + Send + Sync + 'static, index: impl AsRef<str> + Send + Sync + 'static,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
todo!() self.index_controller.delete_index(index.as_ref().to_owned()).await?;
//let index_controller = self.index_controller.clone(); Ok(())
//tokio::task::spawn_blocking(move || { index_controller.delete_index(index) }).await??;
//Ok(())
} }
pub async fn get_update_status(&self, index: impl AsRef<str>, uid: u64) -> anyhow::Result<Option<UpdateStatus>> { pub async fn get_update_status(&self, index: impl AsRef<str>, uid: u64) -> anyhow::Result<Option<UpdateStatus>> {

View File

@ -1,19 +1,20 @@
use std::collections::{hash_map::Entry, HashMap}; use std::collections::{hash_map::Entry, HashMap};
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File, remove_dir_all};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use async_stream::stream; use async_stream::stream;
use chrono::Utc; use chrono::Utc;
use futures::pin_mut;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::info; use log::info;
use std::future::Future;
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
use std::future::Future;
use futures::pin_mut;
use super::get_arc_ownership_blocking;
use super::update_handler::UpdateHandler; use super::update_handler::UpdateHandler;
use crate::index::UpdateResult as UResult; use crate::index::UpdateResult as UResult;
use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; use crate::index::{Document, Index, SearchQuery, SearchResult, Settings};
@ -59,7 +60,11 @@ enum IndexMsg {
attributes_to_retrieve: Option<Vec<String>>, attributes_to_retrieve: Option<Vec<String>>,
doc_id: String, doc_id: String,
ret: oneshot::Sender<Result<Document>>, ret: oneshot::Sender<Result<Document>>,
} },
Delete {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
} }
struct IndexActor<S> { struct IndexActor<S> {
@ -82,13 +87,14 @@ trait IndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMetadata>; async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMetadata>;
async fn get_or_create(&self, uuid: Uuid) -> Result<Index>; async fn get_or_create(&self, uuid: Uuid) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>; async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: &Uuid) -> Result<Option<Index>>;
} }
impl<S: IndexStore + Sync + Send> IndexActor<S> { impl<S: IndexStore + Sync + Send> IndexActor<S> {
fn new( fn new(
read_receiver: mpsc::Receiver<IndexMsg>, read_receiver: mpsc::Receiver<IndexMsg>,
write_receiver: mpsc::Receiver<IndexMsg>, write_receiver: mpsc::Receiver<IndexMsg>,
store: S store: S,
) -> Self { ) -> Self {
let options = IndexerOpts::default(); let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).unwrap(); let update_handler = UpdateHandler::new(&options).unwrap();
@ -149,7 +155,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
//futures.push(fut2); //futures.push(fut2);
//futures.for_each(f) //futures.for_each(f)
tokio::join!(fut1, fut2); tokio::join!(fut1, fut2);
} }
async fn handle_message(&self, msg: IndexMsg) { async fn handle_message(&self, msg: IndexMsg) {
@ -173,7 +178,18 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret) self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret)
.await .await
} }
Document { uuid, attributes_to_retrieve, doc_id, ret } => self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret).await, Document {
uuid,
attributes_to_retrieve,
doc_id,
ret,
} => {
self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret)
.await
}
Delete { uuid, ret } => {
let _ = ret.send(self.handle_delete(uuid).await);
},
} }
} }
@ -236,10 +252,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
) { ) {
let index = self.store.get(uuid).await.unwrap().unwrap(); let index = self.store.get(uuid).await.unwrap().unwrap();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = index.retrieve_documents(offset, limit, attributes_to_retrieve) let result = index
.retrieve_documents(offset, limit, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e)); .map_err(|e| IndexError::Error(e));
let _ = ret.send(result); let _ = ret.send(result);
}).await; })
.await;
} }
async fn handle_fetch_document( async fn handle_fetch_document(
@ -251,10 +269,29 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
) { ) {
let index = self.store.get(uuid).await.unwrap().unwrap(); let index = self.store.get(uuid).await.unwrap().unwrap();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let result = index.retrieve_document(doc_id, attributes_to_retrieve) let result = index
.retrieve_document(doc_id, attributes_to_retrieve)
.map_err(|e| IndexError::Error(e)); .map_err(|e| IndexError::Error(e));
let _ = ret.send(result); let _ = ret.send(result);
}).await; })
.await;
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let index = self.store.delete(&uuid).await?;
if let Some(index) = index {
tokio::task::spawn(async move {
let index = index.0;
let store = get_arc_ownership_blocking(index).await;
tokio::task::spawn_blocking(move || {
store.prepare_for_closing().wait();
info!("Index {} was closed.", uuid);
});
});
}
Ok(())
} }
} }
@ -272,7 +309,10 @@ impl IndexActorHandle {
let store = MapIndexStore::new(path); let store = MapIndexStore::new(path);
let actor = IndexActor::new(read_receiver, write_receiver, store); let actor = IndexActor::new(read_receiver, write_receiver, store);
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
Self { read_sender, write_sender } Self {
read_sender,
write_sender,
}
} }
pub async fn create_index( pub async fn create_index(
@ -346,6 +386,13 @@ impl IndexActorHandle {
let _ = self.read_sender.send(msg).await; let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
pub async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Delete { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
} }
struct MapIndexStore { struct MapIndexStore {
@ -408,6 +455,15 @@ impl IndexStore for MapIndexStore {
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> { async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
Ok(self.index_store.read().await.get(&uuid).cloned()) Ok(self.index_store.read().await.get(&uuid).cloned())
} }
async fn delete(&self, uuid: &Uuid) -> Result<Option<Index>> {
let index = self.index_store.write().await.remove(&uuid);
if index.is_some() {
let db_path = self.root.join(format!("index-{}", uuid));
remove_dir_all(db_path).unwrap();
}
Ok(index)
}
} }
impl MapIndexStore { impl MapIndexStore {

View File

@ -1,11 +1,13 @@
mod updates;
mod index_actor; mod index_actor;
mod update_actor; mod update_actor;
mod uuid_resolver;
mod update_store;
mod update_handler; mod update_handler;
mod update_store;
mod updates;
mod uuid_resolver;
use std::path::Path; use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use actix_web::web::{Bytes, Payload}; use actix_web::web::{Bytes, Payload};
use anyhow::Context; use anyhow::Context;
@ -14,6 +16,7 @@ use futures::stream::StreamExt;
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio::time::sleep;
use uuid::Uuid; use uuid::Uuid;
pub use updates::{Processed, Processing, Failed}; pub use updates::{Processed, Processing, Failed};
@ -146,8 +149,14 @@ impl IndexController {
Ok(index_meta) Ok(index_meta)
} }
fn delete_index(&self, index_uid: String) -> anyhow::Result<()> { pub async fn delete_index(&self, index_uid: String) -> anyhow::Result<()> {
todo!() let uuid = self.uuid_resolver
.delete(index_uid)
.await?
.context("index not found")?;
self.update_handle.delete(uuid.clone()).await?;
self.index_handle.delete(uuid).await?;
Ok(())
} }
pub async fn update_status(&self, index: String, id: u64) -> anyhow::Result<Option<UpdateStatus>> { pub async fn update_status(&self, index: String, id: u64) -> anyhow::Result<Option<UpdateStatus>> {
@ -219,3 +228,16 @@ impl IndexController {
Ok(result) Ok(result)
} }
} }
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
loop {
match Arc::try_unwrap(item) {
Ok(item) => return item,
Err(item_arc) => {
item = item_arc;
sleep(Duration::from_millis(100)).await;
continue;
}
}
}
}

View File

@ -1,19 +1,20 @@
use std::collections::{hash_map::Entry, HashMap}; use std::collections::{hash_map::Entry, HashMap};
use std::fs::create_dir_all; use std::fs::{create_dir_all, remove_dir_all};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use super::index_actor::IndexActorHandle; use itertools::Itertools;
use log::info; use log::info;
use super::index_actor::IndexActorHandle;
use thiserror::Error; use thiserror::Error;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
use itertools::Itertools;
use crate::index::UpdateResult; use crate::index::UpdateResult;
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::{UpdateMeta, UpdateStatus};
use super::get_arc_ownership_blocking;
pub type Result<T> = std::result::Result<T, UpdateError>; pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>; type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
@ -42,7 +43,11 @@ enum UpdateMsg<D> {
uuid: Uuid, uuid: Uuid,
ret: oneshot::Sender<Result<Option<UpdateStatus>>>, ret: oneshot::Sender<Result<Option<UpdateStatus>>>,
id: u64, id: u64,
} },
Delete {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
} }
struct UpdateActor<D, S> { struct UpdateActor<D, S> {
@ -54,6 +59,7 @@ struct UpdateActor<D, S> {
#[async_trait::async_trait] #[async_trait::async_trait]
trait UpdateStoreStore { trait UpdateStoreStore {
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>; async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>;
async fn delete(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>>;
async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>>; async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>>;
} }
@ -89,6 +95,9 @@ where
Some(GetUpdate { uuid, ret, id }) => { Some(GetUpdate { uuid, ret, id }) => {
let _ = ret.send(self.handle_get_update(uuid, id).await); let _ = ret.send(self.handle_get_update(uuid, id).await);
} }
Some(Delete { uuid, ret }) => {
let _ = ret.send(self.handle_delete(uuid).await);
}
None => {} None => {}
} }
} }
@ -173,6 +182,24 @@ where
.map_err(|e| UpdateError::Error(Box::new(e)))?; .map_err(|e| UpdateError::Error(Box::new(e)))?;
Ok(result) Ok(result)
} }
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store
.delete(&uuid)
.await?;
if let Some(store) = store {
tokio::task::spawn(async move {
let store = get_arc_ownership_blocking(store).await;
tokio::task::spawn_blocking(move || {
store.prepare_for_closing().wait();
info!("Update store {} was closed.", uuid);
});
});
}
Ok(())
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -225,6 +252,13 @@ where
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }
pub async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Delete { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
} }
struct MapUpdateStoreStore { struct MapUpdateStoreStore {
@ -269,4 +303,13 @@ impl UpdateStoreStore for MapUpdateStoreStore {
async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> { async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> {
Ok(self.db.read().await.get(uuid).cloned()) Ok(self.db.read().await.get(uuid).cloned())
} }
async fn delete(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> {
let store = self.db.write().await.remove(&uuid);
if store.is_some() {
let path = self.path.clone().join(format!("updates-{}", uuid));
remove_dir_all(path).unwrap();
}
Ok(store)
}
} }

View File

@ -22,6 +22,10 @@ enum UuidResolveMsg {
name: String, name: String,
ret: oneshot::Sender<Result<Uuid>>, ret: oneshot::Sender<Result<Uuid>>,
}, },
Delete {
name: String,
ret: oneshot::Sender<Result<Option<Uuid>>>,
},
} }
struct UuidResolverActor<S> { struct UuidResolverActor<S> {
@ -45,6 +49,7 @@ impl<S: UuidStore> UuidResolverActor<S> {
Some(Create { name, ret }) => self.handle_create(name, ret).await, Some(Create { name, ret }) => self.handle_create(name, ret).await,
Some(GetOrCreate { name, ret }) => self.handle_get_or_create(name, ret).await, Some(GetOrCreate { name, ret }) => self.handle_get_or_create(name, ret).await,
Some(Resolve { name, ret }) => self.handle_resolve(name, ret).await, Some(Resolve { name, ret }) => self.handle_resolve(name, ret).await,
Some(Delete { name, ret }) => self.handle_delete(name, ret).await,
// all senders have been dropped, need to quit. // all senders have been dropped, need to quit.
None => break, None => break,
} }
@ -64,7 +69,12 @@ impl<S: UuidStore> UuidResolverActor<S> {
} }
async fn handle_resolve(&self, name: String, ret: oneshot::Sender<Result<Option<Uuid>>>) { async fn handle_resolve(&self, name: String, ret: oneshot::Sender<Result<Option<Uuid>>>) {
let result = self.store.get_uuid(name).await; let result = self.store.get_uuid(&name).await;
let _ = ret.send(result);
}
async fn handle_delete(&self, name: String, ret: oneshot::Sender<Result<Option<Uuid>>>) {
let result = self.store.delete(&name).await;
let _ = ret.send(result); let _ = ret.send(result);
} }
} }
@ -103,6 +113,13 @@ impl UuidResolverHandle {
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("Uuid resolver actor has been killed")?) Ok(receiver.await.expect("Uuid resolver actor has been killed")?)
} }
pub async fn delete(&self, name: String) -> anyhow::Result<Option<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Delete { name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("Uuid resolver actor has been killed")?)
}
} }
#[derive(Clone, Debug, Error)] #[derive(Clone, Debug, Error)]
@ -116,7 +133,8 @@ trait UuidStore {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return // Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise. // the uuid otherwise.
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid>; async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid>;
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>>; async fn get_uuid(&self, name: &str) -> Result<Option<Uuid>>;
async fn delete(&self, name: &str) -> Result<Option<Uuid>>;
} }
struct MapUuidStore(Arc<RwLock<HashMap<String, Uuid>>>); struct MapUuidStore(Arc<RwLock<HashMap<String, Uuid>>>);
@ -140,7 +158,11 @@ impl UuidStore for MapUuidStore {
} }
} }
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> { async fn get_uuid(&self, name: &str) -> Result<Option<Uuid>> {
Ok(self.0.read().await.get(&name).cloned()) Ok(self.0.read().await.get(name).cloned())
}
async fn delete(&self, name: &str) -> Result<Option<Uuid>> {
Ok(self.0.write().await.remove(name))
} }
} }