143: Shared update store r=irevoire a=MarinPostma

This PR changes the updates process so that only one instance of an update store is shared among indexes.

This allows updates to always be processed sequentially without additional synchronization, and fixes the bug where all the first pending update for each index were reported as processing whereas only one was.

EDIT:

I ended having to rewrite the whole `UpdateStore` to allow updates being really queued and processed sequentially in the ordered they were added. For that purpose I created a `pending_queue` that orders the updates by a global update id.

To find the next `update_id` to use, both globally and for each index, I have created another database that contains the next id to use.

Finally, all updates that have been processed (with success or otherwise) are all stores in an `updates` database.

The layout for the keys of these databases are such that it is easy to iterate over the elements for a particular index, and greatly reduces the amount of code to do so, compared to the former implementation.

I have also simplified the locking mechanism for the update store, thanks to the StateLock data structure, that allow both an arbitrary number of readers and a single writer to concurrently access the state. The current state can be either Idle, Processing, or Snapshotting. When an update or snapshotting is ongoing, the process holds the state lock until it is done processing its task. When it is done, it sets bask the state to Idle.

I have made other small improvements here and there, and have let some other for work, such as:
- When creating an update file to hold a request's content, it would be preferable to first create a temporary file, and then atomically persist it when we have written to it. This would simplify the case when there is no data to be written to the file, since we wouldn't have to take care about cleaning after ourselves.
- The logic for content validation must be factored.
- Some more tests related to error handling in the process_pending_update function.
- The issue #159

close #114


Co-authored-by: Marin Postma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2021-04-27 18:41:55 +00:00 committed by GitHub
commit 8bc7dd8b03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1139 additions and 1034 deletions

34
Cargo.lock generated
View File

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "actix-codec"
version = "0.4.0"
@ -286,6 +288,12 @@ version = "1.0.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b"
[[package]]
name = "arc-swap"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4d7d63395147b81a9e570bcc6243aaf71c017bd666d4909cfef0085bdda8d73"
[[package]]
name = "assert-json-diff"
version = "1.0.1"
@ -295,19 +303,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "async-compression"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b72c1f1154e234325b50864a349b9c8e56939e266a4c307c0f159812df2f9537"
dependencies = [
"flate2",
"futures-core",
"memchr",
"pin-project-lite 0.2.6",
"tokio 0.2.25",
]
[[package]]
name = "async-stream"
version = "0.3.1"
@ -775,16 +770,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if 1.0.0",
"num_cpus",
]
[[package]]
name = "debugid"
version = "0.7.2"
@ -1751,8 +1736,8 @@ dependencies = [
"actix-web",
"actix-web-static-files",
"anyhow",
"arc-swap",
"assert-json-diff",
"async-compression",
"async-stream",
"async-trait",
"byte-unit",
@ -1760,7 +1745,6 @@ dependencies = [
"cargo_toml",
"chrono",
"crossbeam-channel",
"dashmap",
"either",
"env_logger 0.8.3",
"flate2",

View File

@ -28,14 +28,13 @@ actix-service = "2.0.0"
actix-web = { version = "=4.0.0-beta.6", features = ["rustls"] }
actix-web-static-files = { git = "https://github.com/MarinPostma/actix-web-static-files.git", rev = "6db8c3e", optional = true }
anyhow = "1.0.36"
async-compression = { version = "0.3.6", features = ["gzip", "tokio-02"] }
async-stream = "0.3.0"
async-trait = "0.1.42"
arc-swap = "1.2.0"
byte-unit = { version = "4.0.9", default-features = false, features = ["std"] }
bytes = "0.6.0"
chrono = { version = "0.4.19", features = ["serde"] }
crossbeam-channel = "0.5.0"
dashmap = "4.0.2"
either = "1.6.1"
env_logger = "0.8.2"
flate2 = "1.0.19"

View File

@ -1,12 +1,10 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use sha2::Digest;
use crate::index::Settings;
use crate::index_controller::{IndexController, IndexStats};
use crate::index_controller::{IndexController, IndexStats, Stats};
use crate::index_controller::{IndexMetadata, IndexSettings};
use crate::option::Opt;
@ -39,13 +37,6 @@ pub struct ApiKeys {
pub master: Option<String>,
}
#[derive(Default)]
pub struct Stats {
pub database_size: u64,
pub last_update: Option<DateTime<Utc>>,
pub indexes: HashMap<String, IndexStats>,
}
impl ApiKeys {
pub fn generate_missing_api_keys(&mut self) {
if let Some(master_key) = &self.master {
@ -77,11 +68,7 @@ impl Data {
api_keys.generate_missing_api_keys();
let inner = DataInner {
index_controller,
options,
api_keys,
};
let inner = DataInner { index_controller, api_keys, options };
let inner = Arc::new(inner);
Ok(Data { inner })
@ -114,31 +101,11 @@ impl Data {
}
pub async fn get_index_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
Ok(self.index_controller.get_stats(uid).await?)
Ok(self.index_controller.get_index_stats(uid).await?)
}
pub async fn get_stats(&self) -> anyhow::Result<Stats> {
let mut stats = Stats::default();
stats.database_size += self.index_controller.get_uuids_size().await?;
for index in self.index_controller.list_indexes().await? {
let index_stats = self.index_controller.get_stats(index.uid.clone()).await?;
stats.database_size += index_stats.size;
stats.database_size += self
.index_controller
.get_updates_size(index.uid.clone())
.await?;
stats.last_update = Some(match stats.last_update {
Some(last_update) => last_update.max(index.meta.updated_at),
None => index.meta.updated_at,
});
stats.indexes.insert(index.uid, index_stats);
}
Ok(stats)
pub async fn get_all_stats(&self) -> anyhow::Result<Stats> {
Ok(self.index_controller.get_all_stats().await?)
}
#[inline]

View File

@ -6,9 +6,9 @@ use anyhow::{bail, Context};
use milli::obkv_to_json;
use serde_json::{Map, Value};
pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{Facets, Settings, UpdateResult};
use crate::helpers::EnvSizer;
pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{Facets, Settings};
mod search;
mod updates;
@ -59,9 +59,7 @@ impl Index {
})
.transpose()?
.unwrap_or_else(BTreeSet::new);
let distinct_attribute = self
.distinct_attribute(&txn)?
.map(String::from);
let distinct_attribute = self.distinct_attribute(&txn)?.map(String::from);
Ok(Settings {
displayed_attributes: Some(Some(displayed_attributes)),

View File

@ -4,17 +4,11 @@ use std::num::NonZeroUsize;
use flate2::read::GzDecoder;
use log::info;
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use serde::{de::Deserializer, Deserialize, Serialize};
use super::Index;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult {
DocumentsAddition(DocumentAdditionResult),
DocumentDeletion { deleted: u64 },
Other,
}
use crate::index_controller::UpdateResult;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
@ -91,7 +85,7 @@ impl Index {
&self,
format: UpdateFormat,
method: IndexDocumentsMethod,
content: impl io::Read,
content: Option<impl io::Read>,
update_builder: UpdateBuilder,
primary_key: Option<&str>,
) -> anyhow::Result<UpdateResult> {
@ -108,16 +102,15 @@ impl Index {
builder.update_format(format);
builder.index_documents_method(method);
let gzipped = false;
let reader = if gzipped {
Box::new(GzDecoder::new(content))
} else {
Box::new(content) as Box<dyn io::Read>
};
let indexing_callback =
|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step);
let result = builder.execute(reader, |indexing_step, update_id| {
info!("update {}: {:?}", update_id, indexing_step)
});
let gzipped = false;
let result = match content {
Some(content) if gzipped => builder.execute(GzDecoder::new(content), indexing_callback),
Some(content) => builder.execute(content, indexing_callback),
None => builder.execute(std::io::empty(), indexing_callback),
};
info!("document addition done: {:?}", result);
@ -228,10 +221,13 @@ impl Index {
pub fn delete_documents(
&self,
document_ids: impl io::Read,
document_ids: Option<impl io::Read>,
update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> {
let ids: Vec<String> = serde_json::from_reader(document_ids)?;
let ids = match document_ids {
Some(reader) => serde_json::from_reader(reader)?,
None => Vec::<String>::new(),
};
let mut txn = self.write_txn()?;
let mut builder = update_builder.delete_documents(&mut txn, self)?;

View File

@ -1,96 +1,59 @@
use std::fs::File;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use async_stream::stream;
use futures::pin_mut;
use futures::stream::StreamExt;
use heed::CompactionOption;
use log::debug;
use tokio::sync::{mpsc, RwLock};
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::update_handler::UpdateHandler;
use crate::index_controller::{
get_arc_ownership_blocking, updates::Processing, IndexStats, UpdateMeta,
get_arc_ownership_blocking, update_handler::UpdateHandler, Failed, IndexStats, Processed,
Processing,
};
use crate::option::IndexerOpts;
use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult};
use super::{IndexError, IndexMeta, IndexMsg, IndexResult, IndexSettings, IndexStore};
pub const CONCURRENT_INDEX_MSG: usize = 10;
pub struct IndexActor<S> {
read_receiver: Option<mpsc::Receiver<IndexMsg>>,
write_receiver: Option<mpsc::Receiver<IndexMsg>>,
receiver: Option<mpsc::Receiver<IndexMsg>>,
update_handler: Arc<UpdateHandler>,
processing: RwLock<Option<Uuid>>,
store: S,
}
impl<S: IndexStore + Sync + Send> IndexActor<S> {
pub fn new(
read_receiver: mpsc::Receiver<IndexMsg>,
write_receiver: mpsc::Receiver<IndexMsg>,
store: S,
) -> Result<Self> {
pub fn new(receiver: mpsc::Receiver<IndexMsg>, store: S) -> IndexResult<Self> {
let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?;
let update_handler = Arc::new(update_handler);
let read_receiver = Some(read_receiver);
let write_receiver = Some(write_receiver);
Ok(Self {
read_receiver,
write_receiver,
update_handler,
processing: RwLock::new(None),
store,
})
let receiver = Some(receiver);
Ok(Self { receiver, update_handler, store })
}
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
/// through the read channel are processed concurrently, the messages sent through the write
/// channel are processed one at a time.
pub async fn run(mut self) {
let mut read_receiver = self
.read_receiver
let mut receiver = self
.receiver
.take()
.expect("Index Actor must have a inbox at this point.");
let read_stream = stream! {
let stream = stream! {
loop {
match read_receiver.recv().await {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
let mut write_receiver = self
.write_receiver
.take()
.expect("Index Actor must have a inbox at this point.");
let write_stream = stream! {
loop {
match write_receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
pin_mut!(write_stream);
pin_mut!(read_stream);
let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg));
let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg));
let fut1: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut1);
let fut2: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut2);
tokio::join!(fut1, fut2);
stream
.for_each_concurrent(Some(CONCURRENT_INDEX_MSG), |msg| self.handle_message(msg))
.await;
}
async fn handle_message(&self, msg: IndexMsg) {
@ -103,8 +66,13 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
} => {
let _ = ret.send(self.handle_create_index(uuid, primary_key).await);
}
Update { ret, meta, data } => {
let _ = ret.send(self.handle_update(meta, data).await);
Update {
ret,
meta,
data,
uuid,
} => {
let _ = ret.send(self.handle_update(uuid, meta, data).await);
}
Search { ret, query, uuid } => {
let _ = ret.send(self.handle_search(uuid, query).await);
@ -170,7 +138,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> Result<IndexMeta> {
) -> IndexResult<IndexMeta> {
let index = self.store.create(uuid, primary_key).await?;
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
@ -180,31 +148,23 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
async fn handle_update(
&self,
meta: Processing<UpdateMeta>,
data: File,
) -> Result<UpdateResult> {
async fn get_result<S: IndexStore>(actor: &IndexActor<S>, meta: Processing<UpdateMeta>, data: File) -> Result<UpdateResult> {
debug!("Processing update {}", meta.id());
let uuid = *meta.index_uuid();
let update_handler = actor.update_handler.clone();
let index = match actor.store.get(uuid).await? {
Some(index) => index,
None => actor.store.create(uuid, None).await?,
};
uuid: Uuid,
meta: Processing,
data: Option<File>,
) -> IndexResult<Result<Processed, Failed>> {
debug!("Processing update {}", meta.id());
let update_handler = self.update_handler.clone();
let index = match self.store.get(uuid).await? {
Some(index) => index,
None => self.store.create(uuid, None).await?,
};
spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await
.map_err(|e| IndexError::Error(e.into()))
}
*self.processing.write().await = Some(*meta.index_uuid());
let result = get_result(self, meta, data).await;
*self.processing.write().await = None;
result
spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await
.map_err(|e| IndexError::Error(e.into()))
}
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
async fn handle_settings(&self, uuid: Uuid) -> IndexResult<Settings> {
let index = self
.store
.get(uuid)
@ -221,7 +181,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
) -> IndexResult<Vec<Document>> {
let index = self
.store
.get(uuid)
@ -241,7 +201,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
) -> IndexResult<Document> {
let index = self
.store
.get(uuid)
@ -256,7 +216,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.map_err(|e| IndexError::Error(e.into()))?
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> {
let index = self.store.delete(uuid).await?;
if let Some(index) = index {
@ -273,7 +233,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(())
}
async fn handle_get_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> {
match self.store.get(uuid).await? {
Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index))
@ -289,7 +249,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> Result<IndexMeta> {
) -> IndexResult<IndexMeta> {
let index = self
.store
.get(uuid)
@ -316,7 +276,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.map_err(|e| IndexError::Error(e.into()))?
}
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> {
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> {
use tokio::fs::create_dir_all;
path.push("indexes");
@ -346,23 +306,20 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(())
}
async fn handle_get_stats(&self, uuid: Uuid) -> Result<IndexStats> {
async fn handle_get_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
let processing = self.processing.read().await;
let is_indexing = *processing == Some(uuid);
spawn_blocking(move || {
let rtxn = index.read_txn()?;
Ok(IndexStats {
size: index.size(),
number_of_documents: index.number_of_documents(&rtxn)?,
is_indexing,
is_indexing: None,
fields_distribution: index.fields_distribution(&rtxn)?,
})
})

View File

@ -3,55 +3,64 @@ use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::{updates::Processing, UpdateMeta};
use crate::index_controller::{IndexSettings, IndexStats};
use super::{
IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult,
use crate::index_controller::{IndexSettings, IndexStats, Processing};
use crate::{
index::{Document, SearchQuery, SearchResult, Settings},
index_controller::{Failed, Processed},
};
use super::{IndexActor, IndexActorHandle, IndexMeta, IndexMsg, IndexResult, MapIndexStore};
#[derive(Clone)]
pub struct IndexActorHandleImpl {
read_sender: mpsc::Sender<IndexMsg>,
write_sender: mpsc::Sender<IndexMsg>,
sender: mpsc::Sender<IndexMsg>,
}
#[async_trait::async_trait]
impl IndexActorHandle for IndexActorHandleImpl {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
async fn create_index(
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> IndexResult<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex {
ret,
uuid,
primary_key,
};
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
async fn update(
&self,
meta: Processing<UpdateMeta>,
data: std::fs::File,
) -> anyhow::Result<UpdateResult> {
uuid: Uuid,
meta: Processing,
data: Option<std::fs::File>,
) -> anyhow::Result<Result<Processed, Failed>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { ret, meta, data };
let _ = self.read_sender.send(msg).await;
let msg = IndexMsg::Update {
ret,
meta,
data,
uuid,
};
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult<SearchResult> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Search { uuid, query, ret };
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn settings(&self, uuid: Uuid) -> Result<Settings> {
async fn settings(&self, uuid: Uuid) -> IndexResult<Settings> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Settings { uuid, ret };
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
@ -61,7 +70,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
) -> IndexResult<Vec<Document>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Documents {
uuid,
@ -70,7 +79,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
attributes_to_retrieve,
limit,
};
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
@ -79,7 +88,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
) -> IndexResult<Document> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Document {
uuid,
@ -87,61 +96,61 @@ impl IndexActorHandle for IndexActorHandleImpl {
doc_id,
attributes_to_retrieve,
};
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn delete(&self, uuid: Uuid) -> Result<()> {
async fn delete(&self, uuid: Uuid) -> IndexResult<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Delete { uuid, ret };
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
async fn get_index_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetMeta { uuid, ret };
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta> {
async fn update_index(
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> IndexResult<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::UpdateIndex {
uuid,
index_settings,
ret,
};
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Snapshot { uuid, path, ret };
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats> {
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetStats { uuid, ret };
let _ = self.read_sender.send(msg).await;
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
}
impl IndexActorHandleImpl {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> {
let (read_sender, read_receiver) = mpsc::channel(100);
let (write_sender, write_receiver) = mpsc::channel(100);
let (sender, receiver) = mpsc::channel(100);
let store = MapIndexStore::new(path, index_size);
let actor = IndexActor::new(read_receiver, write_receiver, store)?;
let actor = IndexActor::new(receiver, store)?;
tokio::task::spawn(actor.run());
Ok(Self {
read_sender,
write_sender,
})
Ok(Self { sender })
}
}

View File

@ -4,20 +4,21 @@ use tokio::sync::oneshot;
use uuid::Uuid;
use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::{updates::Processing, IndexStats, UpdateMeta};
use crate::index_controller::{Failed, IndexStats, Processed, Processing};
use super::{IndexMeta, IndexSettings, Result, UpdateResult};
use super::{IndexMeta, IndexResult, IndexSettings};
pub enum IndexMsg {
CreateIndex {
uuid: Uuid,
primary_key: Option<String>,
ret: oneshot::Sender<Result<IndexMeta>>,
ret: oneshot::Sender<IndexResult<IndexMeta>>,
},
Update {
meta: Processing<UpdateMeta>,
data: std::fs::File,
ret: oneshot::Sender<Result<UpdateResult>>,
uuid: Uuid,
meta: Processing,
data: Option<std::fs::File>,
ret: oneshot::Sender<IndexResult<Result<Processed, Failed>>>,
},
Search {
uuid: Uuid,
@ -26,41 +27,41 @@ pub enum IndexMsg {
},
Settings {
uuid: Uuid,
ret: oneshot::Sender<Result<Settings>>,
ret: oneshot::Sender<IndexResult<Settings>>,
},
Documents {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
offset: usize,
limit: usize,
ret: oneshot::Sender<Result<Vec<Document>>>,
ret: oneshot::Sender<IndexResult<Vec<Document>>>,
},
Document {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
doc_id: String,
ret: oneshot::Sender<Result<Document>>,
ret: oneshot::Sender<IndexResult<Document>>,
},
Delete {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
ret: oneshot::Sender<IndexResult<()>>,
},
GetMeta {
uuid: Uuid,
ret: oneshot::Sender<Result<IndexMeta>>,
ret: oneshot::Sender<IndexResult<IndexMeta>>,
},
UpdateIndex {
uuid: Uuid,
index_settings: IndexSettings,
ret: oneshot::Sender<Result<IndexMeta>>,
ret: oneshot::Sender<IndexResult<IndexMeta>>,
},
Snapshot {
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
ret: oneshot::Sender<IndexResult<()>>,
},
GetStats {
uuid: Uuid,
ret: oneshot::Sender<Result<IndexStats>>,
ret: oneshot::Sender<IndexResult<IndexStats>>,
},
}

View File

@ -1,3 +1,4 @@
use std::fs::File;
use std::path::PathBuf;
use chrono::{DateTime, Utc};
@ -8,16 +9,13 @@ use thiserror::Error;
use uuid::Uuid;
use actor::IndexActor;
pub use actor::CONCURRENT_INDEX_MSG;
pub use handle_impl::IndexActorHandleImpl;
use message::IndexMsg;
use store::{IndexStore, MapIndexStore};
use crate::index::UpdateResult as UResult;
use crate::index::{Document, Index, SearchQuery, SearchResult, Settings};
use crate::index_controller::{
updates::{Failed, Processed, Processing},
IndexStats, UpdateMeta,
};
use crate::index_controller::{Failed, Processed, Processing, IndexStats};
use super::IndexSettings;
@ -26,8 +24,7 @@ mod handle_impl;
mod message;
mod store;
pub type Result<T> = std::result::Result<T, IndexError>;
type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>;
pub type IndexResult<T> = std::result::Result<T, IndexError>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
@ -38,20 +35,16 @@ pub struct IndexMeta {
}
impl IndexMeta {
fn new(index: &Index) -> Result<Self> {
fn new(index: &Index) -> IndexResult<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
fn new_txn(index: &Index, txn: &heed::RoTxn) -> IndexResult<Self> {
let created_at = index.created_at(&txn)?;
let updated_at = index.updated_at(&txn)?;
let primary_key = index.primary_key(&txn)?.map(String::from);
Ok(Self {
primary_key,
updated_at,
created_at,
})
Ok(Self { created_at, updated_at, primary_key })
}
}
@ -72,14 +65,16 @@ pub enum IndexError {
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait IndexActorHandle {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>;
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>)
-> IndexResult<IndexMeta>;
async fn update(
&self,
meta: Processing<UpdateMeta>,
data: std::fs::File,
) -> anyhow::Result<UpdateResult>;
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult>;
async fn settings(&self, uuid: Uuid) -> Result<Settings>;
uuid: Uuid,
meta: Processing,
data: Option<File>,
) -> anyhow::Result<Result<Processed, Failed>>;
async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult<SearchResult>;
async fn settings(&self, uuid: Uuid) -> IndexResult<Settings>;
async fn documents(
&self,
@ -87,16 +82,103 @@ pub trait IndexActorHandle {
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>>;
) -> IndexResult<Vec<Document>>;
async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta>;
async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats>;
) -> IndexResult<Document>;
async fn delete(&self, uuid: Uuid) -> IndexResult<()>;
async fn get_index_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta>;
async fn update_index(
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> IndexResult<IndexMeta>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats>;
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use super::*;
#[async_trait::async_trait]
/// Useful for passing around an `Arc<MockIndexActorHandle>` in tests.
impl IndexActorHandle for Arc<MockIndexActorHandle> {
async fn create_index(
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> IndexResult<IndexMeta> {
self.as_ref().create_index(uuid, primary_key).await
}
async fn update(
&self,
uuid: Uuid,
meta: Processing,
data: Option<std::fs::File>,
) -> anyhow::Result<Result<Processed, Failed>> {
self.as_ref().update(uuid, meta, data).await
}
async fn search(&self, uuid: Uuid, query: SearchQuery) -> IndexResult<SearchResult> {
self.as_ref().search(uuid, query).await
}
async fn settings(&self, uuid: Uuid) -> IndexResult<Settings> {
self.as_ref().settings(uuid).await
}
async fn documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Vec<Document>> {
self.as_ref()
.documents(uuid, offset, limit, attributes_to_retrieve)
.await
}
async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> IndexResult<Document> {
self.as_ref()
.document(uuid, doc_id, attributes_to_retrieve)
.await
}
async fn delete(&self, uuid: Uuid) -> IndexResult<()> {
self.as_ref().delete(uuid).await
}
async fn get_index_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> {
self.as_ref().get_index_meta(uuid).await
}
async fn update_index(
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> IndexResult<IndexMeta> {
self.as_ref().update_index(uuid, index_settings).await
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
self.as_ref().snapshot(uuid, path).await
}
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
self.as_ref().get_index_stats(uuid).await
}
}
}

View File

@ -8,16 +8,16 @@ use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::{IndexError, Result};
use super::{IndexError, IndexResult};
use crate::index::Index;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
#[async_trait::async_trait]
pub trait IndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> IndexResult<Index>;
async fn get(&self, uuid: Uuid) -> IndexResult<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> IndexResult<Option<Index>>;
}
pub struct MapIndexStore {
@ -40,14 +40,14 @@ impl MapIndexStore {
#[async_trait::async_trait]
impl IndexStore for MapIndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> IndexResult<Index> {
let path = self.path.join(format!("index-{}", uuid));
if path.exists() {
return Err(IndexError::IndexAlreadyExists);
}
let index_size = self.index_size;
let index = spawn_blocking(move || -> Result<Index> {
let index = spawn_blocking(move || -> IndexResult<Index> {
let index = open_index(&path, index_size)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;
@ -64,7 +64,7 @@ impl IndexStore for MapIndexStore {
Ok(index)
}
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
async fn get(&self, uuid: Uuid) -> IndexResult<Option<Index>> {
let guard = self.index_store.read().await;
match guard.get(&uuid) {
Some(index) => Ok(Some(index.clone())),
@ -86,7 +86,7 @@ impl IndexStore for MapIndexStore {
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
async fn delete(&self, uuid: Uuid) -> IndexResult<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid));
fs::remove_dir_all(db_path)
.await
@ -96,7 +96,7 @@ impl IndexStore for MapIndexStore {
}
}
fn open_index(path: impl AsRef<Path>, size: usize) -> Result<Index> {
fn open_index(path: impl AsRef<Path>, size: usize) -> IndexResult<Index> {
std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?;
let mut options = EnvOpenOptions::new();
options.map_size(size);

View File

@ -1,28 +1,26 @@
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use actix_web::web::{Bytes, Payload};
use anyhow::bail;
use chrono::{DateTime, Utc};
use futures::stream::StreamExt;
use log::info;
use milli::update::{IndexDocumentsMethod, UpdateFormat};
use milli::FieldsDistribution;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::time::sleep;
use uuid::Uuid;
pub use updates::*;
use index_actor::IndexActorHandle;
use snapshot::load_snapshot;
use snapshot::SnapshotService;
use snapshot::{SnapshotService, load_snapshot};
use update_actor::UpdateActorHandle;
pub use updates::{Failed, Processed, Processing};
use uuid_resolver::UuidError;
use uuid_resolver::UuidResolverHandle;
use uuid_resolver::{UuidError, UuidResolverHandle};
use crate::index::{Document, SearchQuery, SearchResult};
use crate::index::{Facets, Settings, UpdateResult};
use crate::index::{Settings, Document, SearchQuery, SearchResult};
use crate::option::Opt;
mod index_actor;
@ -32,42 +30,33 @@ mod update_handler;
mod updates;
mod uuid_resolver;
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMetadata {
#[serde(skip)]
pub uuid: Uuid,
pub uid: String,
name: String,
#[serde(flatten)]
pub meta: index_actor::IndexMeta,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum UpdateMeta {
DocumentsAddition {
method: IndexDocumentsMethod,
format: UpdateFormat,
primary_key: Option<String>,
},
ClearDocuments,
DeleteDocuments,
Settings(Settings),
Facets(Facets),
}
#[derive(Clone, Debug)]
pub struct IndexSettings {
pub uid: Option<String>,
pub primary_key: Option<String>,
}
#[derive(Clone, Debug)]
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct IndexStats {
#[serde(skip)]
pub size: u64,
pub number_of_documents: u64,
pub is_indexing: bool,
/// Whether the current index is performing an update. It is initially `None` when the
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
pub is_indexing: Option<bool>,
pub fields_distribution: FieldsDistribution,
}
@ -77,6 +66,14 @@ pub struct IndexController {
update_handle: update_actor::UpdateActorHandleImpl<Bytes>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Stats {
pub database_size: u64,
pub last_update: Option<DateTime<Utc>>,
pub indexes: BTreeMap<String, IndexStats>,
}
impl IndexController {
pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
let index_size = options.max_mdb_size.get_bytes() as usize;
@ -166,6 +163,8 @@ impl IndexController {
Err(UuidError::UnexistingIndex(name)) => {
let uuid = Uuid::new_v4();
let status = perform_update(uuid).await?;
// ignore if index creation fails now, since it may already have been created
let _ = self.index_handle.create_index(uuid, None).await;
self.uuid_resolver.insert(name, uuid).await?;
Ok(status)
}
@ -218,6 +217,8 @@ impl IndexController {
Err(UuidError::UnexistingIndex(name)) if create => {
let uuid = Uuid::new_v4();
let status = perform_udpate(uuid).await?;
// ignore if index creation fails now, since it may already have been created
let _ = self.index_handle.create_index(uuid, None).await;
self.uuid_resolver.insert(name, uuid).await?;
Ok(status)
}
@ -233,8 +234,8 @@ impl IndexController {
let uid = uid.ok_or_else(|| anyhow::anyhow!("Can't create an index without a uid."))?;
let uuid = self.uuid_resolver.create(uid.clone()).await?;
let meta = self.index_handle.create_index(uuid, primary_key).await?;
let _ = self.update_handle.create(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
@ -270,6 +271,7 @@ impl IndexController {
for (uid, uuid) in uuids {
let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
@ -327,6 +329,7 @@ impl IndexController {
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let meta = self.index_handle.update_index(uuid, index_settings).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
@ -344,6 +347,7 @@ impl IndexController {
let uuid = self.uuid_resolver.get(uid.clone()).await?;
let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
@ -351,21 +355,44 @@ impl IndexController {
Ok(meta)
}
pub async fn get_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
let uuid = self.uuid_resolver.get(uid.clone()).await?;
Ok(self.index_handle.get_index_stats(uuid).await?)
}
pub async fn get_updates_size(&self, uid: String) -> anyhow::Result<u64> {
let uuid = self.uuid_resolver.get(uid.clone()).await?;
Ok(self.update_handle.get_size(uuid).await?)
}
pub async fn get_uuids_size(&self) -> anyhow::Result<u64> {
Ok(self.uuid_resolver.get_size().await?)
}
pub async fn get_index_stats(&self, uid: String) -> anyhow::Result<IndexStats> {
let uuid = self.uuid_resolver.get(uid).await?;
let update_infos = self.update_handle.get_info().await?;
let mut stats = self.index_handle.get_index_stats(uuid).await?;
// Check if the currently indexing update is from out index.
stats.is_indexing = Some(Some(uuid) == update_infos.processing);
Ok(stats)
}
pub async fn get_all_stats(&self) -> anyhow::Result<Stats> {
let update_infos = self.update_handle.get_info().await?;
let mut database_size = self.get_uuids_size().await? + update_infos.size;
let mut last_update: Option<DateTime<_>> = None;
let mut indexes = BTreeMap::new();
for index in self.list_indexes().await? {
let mut index_stats = self.index_handle.get_index_stats(index.uuid).await?;
database_size += index_stats.size;
last_update = last_update.map_or(Some(index.meta.updated_at), |last| {
Some(last.max(index.meta.updated_at))
});
index_stats.is_indexing = Some(Some(index.uuid) == update_infos.processing);
indexes.insert(index.uid, index_stats);
}
Ok(Stats {
database_size,
last_update,
indexes,
})
}
}
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {

View File

@ -71,16 +71,9 @@ where
return Ok(());
}
let tasks = uuids
.iter()
.map(|&uuid| {
self.update_handle
.snapshot(uuid, temp_snapshot_path.clone())
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await?;
self.update_handle
.snapshot(uuids, temp_snapshot_path.clone())
.await?;
let snapshot_dir = self.snapshot_path.clone();
let snapshot_path = self
.snapshot_path
@ -138,20 +131,28 @@ pub fn load_snapshot(
#[cfg(test)]
mod test {
use std::iter::FromIterator;
use std::{collections::HashSet, sync::Arc};
use futures::future::{err, ok};
use rand::Rng;
use tokio::time::timeout;
use uuid::Uuid;
use super::*;
use crate::index_controller::update_actor::{MockUpdateActorHandle, UpdateError};
use crate::index_controller::index_actor::MockIndexActorHandle;
use crate::index_controller::update_actor::{
MockUpdateActorHandle, UpdateActorHandleImpl, UpdateError,
};
use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError};
#[actix_rt::test]
async fn test_normal() {
let mut rng = rand::thread_rng();
let uuids_num = rng.gen_range(5, 10);
let uuids = (0..uuids_num).map(|_| Uuid::new_v4()).collect::<Vec<_>>();
let uuids_num: usize = rng.gen_range(5, 10);
let uuids = (0..uuids_num)
.map(|_| Uuid::new_v4())
.collect::<HashSet<_>>();
let mut uuid_resolver = MockUuidResolverHandle::new();
let uuids_clone = uuids.clone();
@ -160,14 +161,19 @@ mod test {
.times(1)
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
let mut update_handle = MockUpdateActorHandle::new();
let uuids_clone = uuids.clone();
update_handle
let mut index_handle = MockIndexActorHandle::new();
index_handle
.expect_snapshot()
.withf(move |uuid, _path| uuids_clone.contains(uuid))
.times(uuids_num)
.returning(move |_, _| Box::pin(ok(())));
let dir = tempfile::tempdir_in(".").unwrap();
let handle = Arc::new(index_handle);
let update_handle =
UpdateActorHandleImpl::<Vec<u8>>::new(handle.clone(), dir.path(), 4096 * 100).unwrap();
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
@ -212,7 +218,7 @@ mod test {
uuid_resolver
.expect_snapshot()
.times(1)
.returning(move |_| Box::pin(ok(vec![uuid])));
.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid)))));
let mut update_handle = MockUpdateActorHandle::new();
update_handle

View File

@ -1,46 +1,50 @@
use std::collections::HashSet;
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use futures::StreamExt;
use log::info;
use oxidized_json_checker::JsonChecker;
use tokio::fs;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::io::AsyncWriteExt;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::index_controller::index_actor::IndexActorHandle;
use crate::index_controller::{get_arc_ownership_blocking, UpdateMeta, UpdateStatus};
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo};
use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG};
use crate::index_controller::{UpdateMeta, UpdateStatus};
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStoreStore};
pub struct UpdateActor<D, S, I> {
pub struct UpdateActor<D, I> {
path: PathBuf,
store: S,
store: Arc<UpdateStore>,
inbox: mpsc::Receiver<UpdateMsg<D>>,
index_handle: I,
}
impl<D, S, I> UpdateActor<D, S, I>
impl<D, I> UpdateActor<D, I>
where
D: AsRef<[u8]> + Sized + 'static,
S: UpdateStoreStore,
I: IndexActorHandle + Clone + Send + Sync + 'static,
{
pub fn new(
store: S,
update_db_size: usize,
inbox: mpsc::Receiver<UpdateMsg<D>>,
path: impl AsRef<Path>,
index_handle: I,
) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned();
let path = path.as_ref().join("updates");
std::fs::create_dir_all(&path)?;
let mut options = heed::EnvOpenOptions::new();
options.map_size(update_db_size);
let store = UpdateStore::open(options, &path, index_handle.clone())?;
std::fs::create_dir_all(path.join("update_files"))?;
assert!(path.exists());
Ok(Self {
store,
inbox,
path,
index_handle,
})
Ok(Self { path, store, inbox, index_handle })
}
pub async fn run(mut self) {
@ -67,14 +71,11 @@ where
Some(Delete { uuid, ret }) => {
let _ = ret.send(self.handle_delete(uuid).await);
}
Some(Create { uuid, ret }) => {
let _ = ret.send(self.handle_create(uuid).await);
Some(Snapshot { uuids, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuids, path).await);
}
Some(Snapshot { uuid, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
Some(GetSize { uuid, ret }) => {
let _ = ret.send(self.handle_get_size(uuid).await);
Some(GetInfo { ret }) => {
let _ = ret.send(self.handle_get_info().await);
}
None => break,
}
@ -87,52 +88,64 @@ where
meta: UpdateMeta,
mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> {
let update_store = self.store.get_or_create(uuid).await?;
let update_file_id = uuid::Uuid::new_v4();
let path = self
.path
.join(format!("update_files/update_{}", update_file_id));
let mut file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
while let Some(bytes) = payload.recv().await {
match bytes {
Ok(bytes) => {
file.write_all(bytes.as_ref())
let file_path = match meta {
UpdateMeta::DocumentsAddition { .. }
| UpdateMeta::DeleteDocuments => {
let update_file_id = uuid::Uuid::new_v4();
let path = self
.path
.join(format!("update_files/update_{}", update_file_id));
let mut file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let mut file_len = 0;
while let Some(bytes) = payload.recv().await {
match bytes {
Ok(bytes) => {
file_len += bytes.as_ref().len();
file.write_all(bytes.as_ref())
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
Err(e) => {
return Err(UpdateError::Error(e));
}
}
}
if file_len != 0 {
file.flush()
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
Err(e) => {
return Err(UpdateError::Error(e));
let file = file.into_std().await;
Some((file, path))
} else {
// empty update, delete the empty file.
fs::remove_file(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
None
}
}
}
_ => None
};
file.flush()
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
file.seek(SeekFrom::Start(0))
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let mut file = file.into_std().await;
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || {
use std::io::{copy, sink, BufReader, Seek};
// If the payload is empty, ignore the check.
if file
.metadata()
.map_err(|e| UpdateError::Error(Box::new(e)))?
.len()
> 0
{
let path = if let Some((mut file, path)) = file_path {
// set the file back to the beginning
file.seek(SeekFrom::Start(0)).map_err(|e| UpdateError::Error(Box::new(e)))?;
// Check that the json payload is valid:
let reader = BufReader::new(&mut file);
let mut checker = JsonChecker::new(reader);
@ -144,7 +157,10 @@ where
let _: serde_json::Value = serde_json::from_reader(file)
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
}
Some(path)
} else {
None
};
// The payload is valid, we can register it to the update store.
update_store
@ -157,11 +173,10 @@ where
}
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.get(uuid).await?;
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || {
let result = update_store
.ok_or(UpdateError::UnexistingIndex(uuid))?
.list()
.list(uuid)
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(result)
})
@ -170,77 +185,64 @@ where
}
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let store = self
.store
.get(uuid)
.await?
.ok_or(UpdateError::UnexistingIndex(uuid))?;
let store = self.store.clone();
let result = store
.meta(id)
.meta(uuid, id)
.map_err(|e| UpdateError::Error(Box::new(e)))?
.ok_or(UpdateError::UnexistingUpdate(id))?;
Ok(result)
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store.delete(uuid).await?;
let store = self.store.clone();
if let Some(store) = store {
tokio::task::spawn(async move {
let store = get_arc_ownership_blocking(store).await;
tokio::task::spawn_blocking(move || {
store.prepare_for_closing().wait();
info!("Update store {} was closed.", uuid);
});
});
}
Ok(())
}
async fn handle_create(&self, uuid: Uuid) -> Result<()> {
let _ = self.store.get_or_create(uuid).await?;
Ok(())
}
async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
if let Some(update_store) = self.store.get(uuid).await? {
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
// acquire write lock to prevent further writes during snapshot
// the update lock must be acquired BEFORE the write lock to prevent dead lock
let _lock = update_store.update_lock.lock();
let mut txn = update_store.env.write_txn()?;
// create db snapshot
update_store.snapshot(&mut txn, &path, uuid)?;
futures::executor::block_on(
async move { index_handle.snapshot(uuid, path).await },
)?;
Ok(())
})
tokio::task::spawn_blocking(move || store.delete_all(uuid))
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
}
Ok(())
}
async fn handle_get_size(&self, uuid: Uuid) -> Result<u64> {
let size = match self.store.get(uuid).await? {
Some(update_store) => tokio::task::spawn_blocking(move || -> anyhow::Result<u64> {
let txn = update_store.env.read_txn()?;
async fn handle_snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
update_store.snapshot(&uuids, &path)?;
update_store.get_size(&txn)
// Perform the snapshot of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor
let path = &path;
let handle = &index_handle;
let mut stream = futures::stream::iter(uuids.iter())
.map(|&uuid| handle.snapshot(uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
Handle::current().block_on(async {
while let Some(res) = stream.next().await {
res?;
}
Ok(())
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?,
None => 0,
};
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(size)
Ok(())
}
async fn handle_get_info(&self) -> Result<UpdateStoreInfo> {
let update_store = self.store.clone();
let info = tokio::task::spawn_blocking(move || -> anyhow::Result<UpdateStoreInfo> {
let info = update_store.get_info()?;
Ok(info)
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(info)
}
}

View File

@ -1,13 +1,13 @@
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use crate::index_controller::IndexActorHandle;
use crate::index_controller::{IndexActorHandle, UpdateStatus};
use super::{
MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta,
UpdateMsg, UpdateStatus,
PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo,
};
#[derive(Clone)]
@ -27,10 +27,9 @@ where
where
I: IndexActorHandle + Clone + Send + Sync + 'static,
{
let path = path.as_ref().to_owned().join("updates");
let path = path.as_ref().to_owned();
let (sender, receiver) = mpsc::channel(100);
let store = MapUpdateStoreStore::new(index_handle.clone(), &path, update_store_size);
let actor = UpdateActor::new(store, receiver, path, index_handle)?;
let actor = UpdateActor::new(update_store_size, receiver, path, index_handle)?;
tokio::task::spawn(actor.run());
@ -65,23 +64,16 @@ where
receiver.await.expect("update actor killed.")
}
async fn create(&self, uuid: Uuid) -> Result<()> {
async fn snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Create { uuid, ret };
let msg = UpdateMsg::Snapshot { uuids, path, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
async fn get_info(&self) -> Result<UpdateStoreInfo> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Snapshot { uuid, path, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn get_size(&self, uuid: Uuid) -> Result<u64> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetSize { uuid, ret };
let msg = UpdateMsg::GetInfo { ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}

View File

@ -1,9 +1,10 @@
use std::collections::HashSet;
use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use super::{PayloadData, Result, UpdateMeta, UpdateStatus};
use super::{PayloadData, Result, UpdateMeta, UpdateStatus, UpdateStoreInfo};
pub enum UpdateMsg<D> {
Update {
@ -25,17 +26,12 @@ pub enum UpdateMsg<D> {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
Create {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
Snapshot {
uuid: Uuid,
uuids: HashSet<Uuid>,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
GetSize {
uuid: Uuid,
ret: oneshot::Sender<Result<u64>>,
GetInfo {
ret: oneshot::Sender<Result<UpdateStoreInfo>>,
},
}

View File

@ -1,26 +1,24 @@
mod actor;
mod handle_impl;
mod message;
mod store;
mod update_store;
use std::path::PathBuf;
use std::{collections::HashSet, path::PathBuf};
use thiserror::Error;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::index::UpdateResult;
use crate::index_controller::{UpdateMeta, UpdateStatus};
use actor::UpdateActor;
use message::UpdateMsg;
use store::{MapUpdateStoreStore, UpdateStoreStore};
use update_store::UpdateStore;
pub use update_store::UpdateStoreInfo;
pub use handle_impl::UpdateActorHandleImpl;
pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
type PayloadData<D> = std::result::Result<D, Box<dyn std::error::Error + Sync + Send + 'static>>;
#[cfg(test)]
@ -30,8 +28,6 @@ use mockall::automock;
pub enum UpdateError {
#[error("error with update: {0}")]
Error(Box<dyn std::error::Error + Sync + Send + 'static>),
#[error("Index {0} doesn't exist.")]
UnexistingIndex(Uuid),
#[error("Update {0} doesn't exist.")]
UnexistingUpdate(u64),
}
@ -44,9 +40,8 @@ pub trait UpdateActorHandle {
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>>;
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn create(&self, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
async fn get_size(&self, uuid: Uuid) -> Result<u64>;
async fn snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()>;
async fn get_info(&self) -> Result<UpdateStoreInfo>;
async fn update(
&self,
meta: UpdateMeta,

View File

@ -1,111 +0,0 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::RwLock;
use uuid::Uuid;
use super::{Result, UpdateError, UpdateStore};
use crate::index_controller::IndexActorHandle;
#[async_trait::async_trait]
pub trait UpdateStoreStore {
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>>;
async fn get(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>>;
}
pub struct MapUpdateStoreStore<I> {
db: Arc<RwLock<HashMap<Uuid, Arc<UpdateStore>>>>,
index_handle: I,
path: PathBuf,
update_store_size: usize,
}
impl<I: IndexActorHandle> MapUpdateStoreStore<I> {
pub fn new(index_handle: I, path: impl AsRef<Path>, update_store_size: usize) -> Self {
let db = Arc::new(RwLock::new(HashMap::new()));
let path = path.as_ref().to_owned();
Self {
db,
index_handle,
path,
update_store_size,
}
}
}
#[async_trait::async_trait]
impl<I> UpdateStoreStore for MapUpdateStoreStore<I>
where
I: IndexActorHandle + Clone + Send + Sync + 'static,
{
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>> {
match self.db.write().await.entry(uuid) {
Entry::Vacant(e) => {
let mut options = heed::EnvOpenOptions::new();
let update_store_size = self.update_store_size;
options.map_size(update_store_size);
let path = self.path.clone().join(format!("updates-{}", e.key()));
fs::create_dir_all(&path).await.unwrap();
let index_handle = self.index_handle.clone();
let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file))
})
.map_err(|e| UpdateError::Error(e.into()))?;
let store = e.insert(store);
Ok(store.clone())
}
Entry::Occupied(e) => Ok(e.get().clone()),
}
}
async fn get(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>> {
let guard = self.db.read().await;
match guard.get(&uuid) {
Some(uuid) => Ok(Some(uuid.clone())),
None => {
// The index is not found in the found in the loaded indexes, so we attempt to load
// it from disk. We need to acquire a write lock **before** attempting to open the
// index, because someone could be trying to open it at the same time as us.
drop(guard);
let path = self.path.clone().join(format!("updates-{}", uuid));
if path.exists() {
let mut guard = self.db.write().await;
match guard.entry(uuid) {
Entry::Vacant(entry) => {
// We can safely load the index
let index_handle = self.index_handle.clone();
let mut options = heed::EnvOpenOptions::new();
let update_store_size = self.update_store_size;
options.map_size(update_store_size);
let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file))
})
.map_err(|e| UpdateError::Error(e.into()))?;
let store = entry.insert(store);
Ok(Some(store.clone()))
}
Entry::Occupied(entry) => {
// The index was loaded while we attempted to to iter
Ok(Some(entry.get().clone()))
}
}
} else {
Ok(None)
}
}
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>> {
let store = self.db.write().await.remove(&uuid);
let path = self.path.clone().join(format!("updates-{}", uuid));
if store.is_some() || path.exists() {
fs::remove_dir_all(path).await.unwrap();
}
Ok(store)
}
}

View File

@ -1,99 +1,208 @@
use std::fs::File;
use std::fs::{copy, create_dir_all, remove_file};
use std::path::{Path, PathBuf};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
use std::fs::{copy, create_dir_all, remove_file, File};
use std::mem::size_of;
use std::path::Path;
use std::sync::Arc;
use heed::types::{DecodeIgnore, OwnedType, SerdeJson};
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use anyhow::Context;
use arc_swap::ArcSwap;
use heed::types::{ByteSlice, OwnedType, SerdeJson};
use heed::zerocopy::U64;
use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions};
use parking_lot::{Mutex, MutexGuard};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use uuid::Uuid;
use super::UpdateMeta;
use crate::helpers::EnvSizer;
use crate::index_controller::updates::*;
use crate::index_controller::{IndexActorHandle, updates::*};
#[allow(clippy::upper_case_acronyms)]
type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
type BEU64 = U64<heed::byteorder::BE>;
#[derive(Clone)]
pub struct UpdateStore<M, N, E> {
pub env: Env,
pending_meta: Database<OwnedType<BEU64>, SerdeJson<Enqueued<M>>>,
pending: Database<OwnedType<BEU64>, SerdeJson<PathBuf>>,
processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
processing: Arc<RwLock<Option<Processing<M>>>>,
notification_sender: mpsc::Sender<()>,
/// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is
/// processing, while not preventing writes all together during an update
pub update_lock: Arc<Mutex<()>>,
struct NextIdCodec;
enum NextIdKey {
Global,
Index(Uuid),
}
pub trait HandleUpdate<M, N, E> {
fn handle_update(
&mut self,
meta: Processing<M>,
content: File,
) -> anyhow::Result<Result<Processed<M, N>, Failed<M, E>>>;
pub struct UpdateStoreInfo {
/// Size of the update store in bytes.
pub size: u64,
/// Uuid of the currently processing update if it exists
pub processing: Option<Uuid>,
}
impl<M, N, E, F> HandleUpdate<M, N, E> for F
where
F: FnMut(Processing<M>, File) -> anyhow::Result<Result<Processed<M, N>, Failed<M, E>>>,
{
fn handle_update(
&mut self,
meta: Processing<M>,
content: File,
) -> anyhow::Result<Result<Processed<M, N>, Failed<M, E>>> {
self(meta, content)
/// A data structure that allows concurrent reads AND exactly one writer.
pub struct StateLock {
lock: Mutex<()>,
data: ArcSwap<State>,
}
struct StateLockGuard<'a> {
_lock: MutexGuard<'a, ()>,
state: &'a StateLock,
}
impl StateLockGuard<'_> {
fn swap(&self, state: State) -> Arc<State> {
self.state.data.swap(Arc::new(state))
}
}
impl<M, N, E> UpdateStore<M, N, E>
where
M: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync + Clone,
N: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync,
E: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync,
{
pub fn open<P, U>(
impl StateLock {
fn from_state(state: State) -> Self {
let lock = Mutex::new(());
let data = ArcSwap::from(Arc::new(state));
Self { lock, data }
}
fn read(&self) -> Arc<State> {
self.data.load().clone()
}
fn write(&self) -> StateLockGuard {
let _lock = self.lock.lock();
let state = &self;
StateLockGuard { _lock, state }
}
}
#[allow(clippy::large_enum_variant)]
pub enum State {
Idle,
Processing(Uuid, Processing),
Snapshoting,
}
impl<'a> BytesEncode<'a> for NextIdCodec {
type EItem = NextIdKey;
fn bytes_encode(item: &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
match item {
NextIdKey::Global => Some(Cow::Borrowed(b"__global__")),
NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())),
}
}
}
struct PendingKeyCodec;
impl<'a> BytesEncode<'a> for PendingKeyCodec {
type EItem = (u64, Uuid, u64);
fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(&global_id.to_be_bytes());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for PendingKeyCodec {
type DItem = (u64, Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let global_id_bytes = bytes.get(0..size_of::<u64>())?.try_into().ok()?;
let global_id = u64::from_be_bytes(global_id_bytes);
let uuid_bytes = bytes
.get(size_of::<u64>()..(size_of::<u64>() + size_of::<Uuid>()))?
.try_into()
.ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes
.get((size_of::<u64>() + size_of::<Uuid>())..)?
.try_into()
.ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((global_id, uuid, update_id))
}
}
struct UpdateKeyCodec;
impl<'a> BytesEncode<'a> for UpdateKeyCodec {
type EItem = (Uuid, u64);
fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for UpdateKeyCodec {
type DItem = (Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let uuid_bytes = bytes.get(0..size_of::<Uuid>())?.try_into().ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes.get(size_of::<Uuid>()..)?.try_into().ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((uuid, update_id))
}
}
#[derive(Clone)]
pub struct UpdateStore {
pub env: Env,
/// A queue containing the updates to process, ordered by arrival.
/// The key are built as follow:
/// | global_update_id | index_uuid | update_id |
/// | 8-bytes | 16-bytes | 8-bytes |
pending_queue: Database<PendingKeyCodec, SerdeJson<Enqueued>>,
/// Map indexes to the next available update id. If NextIdKey::Global is queried, then the next
/// global update id is returned
next_update_id: Database<NextIdCodec, OwnedType<BEU64>>,
/// Contains all the performed updates meta, be they failed, aborted, or processed.
/// The keys are built as follow:
/// | Uuid | id |
/// | 16-bytes | 8-bytes |
updates: Database<ByteSlice, SerdeJson<UpdateStatus>>,
/// Indicates the current state of the update store,
state: Arc<StateLock>,
/// Wake up the loop when a new event occurs.
notification_sender: mpsc::Sender<()>,
}
impl UpdateStore {
pub fn open(
mut options: EnvOpenOptions,
path: P,
update_handler: U,
) -> heed::Result<Arc<Self>>
where
P: AsRef<Path>,
U: HandleUpdate<M, N, E> + Sync + Clone + Send + 'static,
{
path: impl AsRef<Path>,
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
) -> anyhow::Result<Arc<Self>> {
options.max_dbs(5);
let env = options.open(path)?;
let pending_meta = env.create_database(Some("pending-meta"))?;
let pending = env.create_database(Some("pending"))?;
let processed_meta = env.create_database(Some("processed-meta"))?;
let aborted_meta = env.create_database(Some("aborted-meta"))?;
let failed_meta = env.create_database(Some("failed-meta"))?;
let processing = Arc::new(RwLock::new(None));
let pending_queue = env.create_database(Some("pending-queue"))?;
let next_update_id = env.create_database(Some("next-update-id"))?;
let updates = env.create_database(Some("updates"))?;
let (notification_sender, mut notification_receiver) = mpsc::channel(10);
// Send a first notification to trigger the process.
let _ = notification_sender.send(());
let update_lock = Arc::new(Mutex::new(()));
let state = Arc::new(StateLock::from_state(State::Idle));
let update_store = Arc::new(UpdateStore {
env,
pending,
pending_meta,
processed_meta,
aborted_meta,
notification_sender,
failed_meta,
processing,
update_lock,
});
// Init update loop to perform any pending updates at launch.
// Since we just launched the update store, and we still own the receiving end of the
// channel, this call is guaranteed to succeed.
notification_sender
.try_send(())
.expect("Failed to init update store");
let update_store = Arc::new(UpdateStore { env, pending_queue, next_update_id, updates, state, notification_sender });
// We need a weak reference so we can take ownership on the arc later when we
// want to close the index.
@ -104,7 +213,7 @@ where
loop {
match update_store_weak.upgrade() {
Some(update_store) => {
let handler = update_handler.clone();
let handler = index_handle.clone();
let res = tokio::task::spawn_blocking(move || {
update_store.process_pending_update(handler)
})
@ -126,115 +235,113 @@ where
Ok(update_store)
}
pub fn prepare_for_closing(self) -> heed::EnvClosingEvent {
self.env.prepare_for_closing()
}
/// Returns the next global update id and the next update id for a given `index_uuid`.
fn next_update_id(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<(u64, u64)> {
let global_id = self
.next_update_id
.get(txn, &NextIdKey::Global)?
.map(U64::get)
.unwrap_or_default();
let update_id = self
.next_update_id
.get(txn, &NextIdKey::Index(index_uuid))?
.map(U64::get)
.unwrap_or_default();
/// Returns the new biggest id to use to store the new update.
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
let last_pending = self
.pending_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
self.next_update_id
.put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?;
self.next_update_id.put(
txn,
&NextIdKey::Index(index_uuid),
&BEU64::new(update_id + 1),
)?;
let last_processed = self
.processed_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
let last_aborted = self
.aborted_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
let last_update_id = [last_pending, last_processed, last_aborted]
.iter()
.copied()
.flatten()
.max();
match last_update_id {
Some(last_id) => Ok(last_id + 1),
None => Ok(0),
}
Ok((global_id, update_id))
}
/// Registers the update content in the pending store and the meta
/// into the pending-meta store. Returns the new unique update id.
pub fn register_update(
&self,
meta: M,
content: impl AsRef<Path>,
meta: UpdateMeta,
content: Option<impl AsRef<Path>>,
index_uuid: Uuid,
) -> heed::Result<Enqueued<M>> {
let mut wtxn = self.env.write_txn()?;
) -> heed::Result<Enqueued> {
let mut txn = self.env.write_txn()?;
// We ask the update store to give us a new update id, this is safe,
// no other update can have the same id because we use a write txn before
// asking for the id and registering it so other update registering
// will be forced to wait for a new write txn.
let update_id = self.new_update_id(&wtxn)?;
let update_key = BEU64::new(update_id);
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
let meta = Enqueued::new(meta, update_id, content.map(|p| p.as_ref().to_owned()));
let meta = Enqueued::new(meta, update_id, index_uuid);
self.pending_meta.put(&mut wtxn, &update_key, &meta)?;
self.pending
.put(&mut wtxn, &update_key, &content.as_ref().to_owned())?;
self.pending_queue
.put(&mut txn, &(global_id, index_uuid, update_id), &meta)?;
wtxn.commit()?;
txn.commit()?;
self.notification_sender
.blocking_send(())
.expect("Update store loop exited.");
Ok(meta)
}
/// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update<U>(&self, mut handler: U) -> anyhow::Result<Option<()>>
where
U: HandleUpdate<M, N, E>,
{
let _lock = self.update_lock.lock();
fn process_pending_update(
&self,
index_handle: impl IndexActorHandle,
) -> anyhow::Result<Option<()>> {
// Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?;
let first_meta = self.pending_meta.first(&rtxn)?;
let first_meta = self.pending_queue.first(&rtxn)?;
drop(rtxn);
// If there is a pending update we process and only keep
// a reader while processing it, not a writer.
match first_meta {
Some((first_id, pending)) => {
let content_path = self
.pending
.get(&rtxn, &first_id)?
.expect("associated update content");
// we change the state of the update from pending to processing before we pass it
// to the update handler. Processing store is non persistent to be able recover
// from a failure
Some(((global_id, index_uuid, update_id), mut pending)) => {
let content_path = pending.content.take();
let processing = pending.processing();
self.processing.write().replace(processing.clone());
let file = File::open(&content_path)?;
// Acquire the state lock and set the current state to processing.
let state = self.state.write();
state.swap(State::Processing(index_uuid, processing.clone()));
let file = match content_path {
Some(ref path) => {
let file = File::open(path)
.with_context(|| format!("file at path: {:?}", &content_path))?;
Some(file)
}
None => None,
};
// Process the pending update using the provided user function.
let result = handler.handle_update(processing, file)?;
drop(rtxn);
let result = Handle::current()
.block_on(index_handle.update(index_uuid, processing, file))?;
// Once the pending update have been successfully processed
// we must remove the content from the pending and processing stores and
// write the *new* meta to the processed-meta store and commit.
let mut wtxn = self.env.write_txn()?;
self.processing.write().take();
self.pending_meta.delete(&mut wtxn, &first_id)?;
remove_file(&content_path)?;
self.pending.delete(&mut wtxn, &first_id)?;
match result {
Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?,
Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?,
self.pending_queue
.delete(&mut wtxn, &(global_id, index_uuid, update_id))?;
if let Some(path) = content_path {
remove_file(&path)?;
}
let result = match result {
Ok(res) => res.into(),
Err(res) => res.into(),
};
self.updates.remap_key_type::<UpdateKeyCodec>().put(
&mut wtxn,
&(index_uuid, update_id),
&result,
)?;
wtxn.commit()?;
state.swap(State::Idle);
Ok(Some(()))
}
@ -242,187 +349,308 @@ where
}
}
pub fn list(&self) -> anyhow::Result<Vec<UpdateStatus<M, N, E>>> {
let rtxn = self.env.read_txn()?;
let mut updates = Vec::new();
/// List the updates for `index_uuid`.
pub fn list(&self, index_uuid: Uuid) -> anyhow::Result<Vec<UpdateStatus>> {
let mut update_list = BTreeMap::<u64, UpdateStatus>::new();
let processing = self.processing.read();
if let Some(ref processing) = *processing {
let update = UpdateStatus::from(processing.clone());
updates.push(update);
}
let txn = self.env.read_txn()?;
let pending = self
.pending_meta
.iter(&rtxn)?
.filter_map(Result::ok)
.filter_map(|(_, p)| (Some(p.id()) != processing.as_ref().map(|p| p.id())).then(|| p))
.map(UpdateStatus::from);
updates.extend(pending);
let aborted = self
.aborted_meta
.iter(&rtxn)?
.filter_map(Result::ok)
.map(|(_, p)| p)
.map(UpdateStatus::from);
updates.extend(aborted);
let processed = self
.processed_meta
.iter(&rtxn)?
.filter_map(Result::ok)
.map(|(_, p)| p)
.map(UpdateStatus::from);
updates.extend(processed);
let failed = self
.failed_meta
.iter(&rtxn)?
.filter_map(Result::ok)
.map(|(_, p)| p)
.map(UpdateStatus::from);
updates.extend(failed);
updates.sort_by_key(|u| u.id());
Ok(updates)
}
/// Returns the update associated meta or `None` if the update doesn't exist.
pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatus<M, N, E>>> {
let rtxn = self.env.read_txn()?;
let key = BEU64::new(update_id);
if let Some(ref meta) = *self.processing.read() {
if meta.id() == update_id {
return Ok(Some(UpdateStatus::Processing(meta.clone())));
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
for entry in pendings {
let ((_, uuid, id), pending) = entry?;
if uuid == index_uuid {
update_list.insert(id, pending.decode()?.into());
}
}
if let Some(meta) = self.pending_meta.get(&rtxn, &key)? {
return Ok(Some(UpdateStatus::Enqueued(meta)));
let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?;
for entry in updates {
let (_, update) = entry?;
update_list.insert(update.id(), update);
}
if let Some(meta) = self.processed_meta.get(&rtxn, &key)? {
return Ok(Some(UpdateStatus::Processed(meta)));
// If the currently processing update is from this index, replace the corresponding pending update with this one.
match *self.state.read() {
State::Processing(uuid, ref processing) if uuid == index_uuid => {
update_list.insert(processing.id(), processing.clone().into());
}
_ => (),
}
if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? {
return Ok(Some(UpdateStatus::Aborted(meta)));
Ok(update_list.into_iter().map(|(_, v)| v).collect())
}
/// Returns the update associated meta or `None` if the update doesn't exist.
pub fn meta(&self, index_uuid: Uuid, update_id: u64) -> heed::Result<Option<UpdateStatus>> {
// Check if the update is the one currently processing
match *self.state.read() {
State::Processing(uuid, ref processing)
if uuid == index_uuid && processing.id() == update_id =>
{
return Ok(Some(processing.clone().into()));
}
_ => (),
}
if let Some(meta) = self.failed_meta.get(&rtxn, &key)? {
return Ok(Some(UpdateStatus::Failed(meta)));
let txn = self.env.read_txn()?;
// Else, check if it is in the updates database:
let update = self
.updates
.remap_key_type::<UpdateKeyCodec>()
.get(&txn, &(index_uuid, update_id))?;
if let Some(update) = update {
return Ok(Some(update));
}
// If nothing was found yet, we resolve to iterate over the pending queue.
let pendings = self
.pending_queue
.remap_key_type::<UpdateKeyCodec>()
.iter(&txn)?
.lazily_decode_data();
for entry in pendings {
let ((uuid, id), pending) = entry?;
if uuid == index_uuid && id == update_id {
return Ok(Some(pending.decode()?.into()));
}
}
// No update was found.
Ok(None)
}
/// Aborts an update, an aborted update content is deleted and
/// the meta of it is moved into the aborted updates database.
///
/// Trying to abort an update that is currently being processed, an update
/// that as already been processed or which doesn't actually exist, will
/// return `None`.
#[allow(dead_code)]
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<Aborted<M>>> {
let mut wtxn = self.env.write_txn()?;
let key = BEU64::new(update_id);
/// Delete all updates for an index from the update store.
pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> {
let mut txn = self.env.write_txn()?;
// Contains all the content file paths that we need to be removed if the deletion was successful.
let mut paths_to_remove = Vec::new();
// We cannot abort an update that is currently being processed.
if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) {
return Ok(None);
let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data();
while let Some(Ok(((_, uuid, _), pending))) = pendings.next() {
if uuid == index_uuid {
pendings.del_current()?;
let mut pending = pending.decode()?;
if let Some(path) = pending.content.take() {
paths_to_remove.push(path);
}
}
}
let pending = match self.pending_meta.get(&wtxn, &key)? {
Some(meta) => meta,
None => return Ok(None),
};
drop(pendings);
let aborted = pending.abort();
let mut updates = self
.updates
.prefix_iter_mut(&mut txn, index_uuid.as_bytes())?
.lazily_decode_data();
self.aborted_meta.put(&mut wtxn, &key, &aborted)?;
self.pending_meta.delete(&mut wtxn, &key)?;
self.pending.delete(&mut wtxn, &key)?;
while let Some(_) = updates.next() {
updates.del_current()?;
}
wtxn.commit()?;
drop(updates);
Ok(Some(aborted))
txn.commit()?;
paths_to_remove.iter().for_each(|path| {
let _ = remove_file(path);
});
// We don't care about the currently processing update, since it will be removed by itself
// once its done processing, and we can't abort a running update.
Ok(())
}
/// Aborts all the pending updates, and not the one being currently processed.
/// Returns the update metas and ids that were successfully aborted.
#[allow(dead_code)]
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, Aborted<M>)>> {
let mut wtxn = self.env.write_txn()?;
let mut aborted_updates = Vec::new();
pub fn snapshot(&self, uuids: &HashSet<Uuid>, path: impl AsRef<Path>) -> anyhow::Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Snapshoting);
// We skip the first pending update as it is currently being processed.
for result in self.pending_meta.iter(&wtxn)?.skip(1) {
let (key, pending) = result?;
let id = key.get();
aborted_updates.push((id, pending.abort()));
}
let txn = self.env.write_txn()?;
for (id, aborted) in &aborted_updates {
let key = BEU64::new(*id);
self.aborted_meta.put(&mut wtxn, &key, &aborted)?;
self.pending_meta.delete(&mut wtxn, &key)?;
self.pending.delete(&mut wtxn, &key)?;
}
wtxn.commit()?;
Ok(aborted_updates)
}
pub fn snapshot(
&self,
txn: &mut heed::RwTxn,
path: impl AsRef<Path>,
uuid: Uuid,
) -> anyhow::Result<()> {
let update_path = path.as_ref().join("updates");
create_dir_all(&update_path)?;
let mut snapshot_path = update_path.join(format!("update-{}", uuid));
// acquire write lock to prevent further writes during snapshot
create_dir_all(&snapshot_path)?;
snapshot_path.push("data.mdb");
create_dir_all(&update_path)?;
let db_path = update_path.join("data.mdb");
// create db snapshot
self.env
.copy_to_path(&snapshot_path, CompactionOption::Enabled)?;
self.env.copy_to_path(&db_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join("update_files");
create_dir_all(&update_files_path)?;
for path in self.pending.iter(&txn)? {
let (_, path) = path?;
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
for entry in pendings {
let ((_, uuid, _), pending) = entry?;
if uuids.contains(&uuid) {
if let Some(path) = pending.decode()?.content_path() {
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
}
}
}
Ok(())
}
pub fn get_size(&self, txn: &heed::RoTxn) -> anyhow::Result<u64> {
pub fn get_info(&self) -> anyhow::Result<UpdateStoreInfo> {
let mut size = self.env.size();
let txn = self.env.read_txn()?;
for path in self.pending.iter(txn)? {
let (_, path) = path?;
if let Ok(metadata) = path.metadata() {
size += metadata.len()
for entry in self.pending_queue.iter(&txn)? {
let (_, pending) = entry?;
if let Some(path) = pending.content_path() {
size += File::open(path)?.metadata()?.len();
}
}
Ok(size)
let processing = match *self.state.read() {
State::Processing(uuid, _) => Some(uuid),
_ => None,
};
Ok(UpdateStoreInfo { size, processing })
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::index_controller::{index_actor::MockIndexActorHandle, UpdateResult};
use futures::future::ok;
#[actix_rt::test]
async fn test_next_id() {
let dir = tempfile::tempdir_in(".").unwrap();
let mut options = EnvOpenOptions::new();
let handle = Arc::new(MockIndexActorHandle::new());
options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir.path(), handle).unwrap();
let index1_uuid = Uuid::new_v4();
let index2_uuid = Uuid::new_v4();
let mut txn = update_store.env.write_txn().unwrap();
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
txn.commit().unwrap();
assert_eq!((0, 0), ids);
let mut txn = update_store.env.write_txn().unwrap();
let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
txn.commit().unwrap();
assert_eq!((1, 0), ids);
let mut txn = update_store.env.write_txn().unwrap();
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
txn.commit().unwrap();
assert_eq!((2, 1), ids);
}
#[actix_rt::test]
async fn test_register_update() {
let dir = tempfile::tempdir_in(".").unwrap();
let mut options = EnvOpenOptions::new();
let handle = Arc::new(MockIndexActorHandle::new());
options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir.path(), handle).unwrap();
let meta = UpdateMeta::ClearDocuments;
let uuid = Uuid::new_v4();
let store_clone = update_store.clone();
tokio::task::spawn_blocking(move || {
store_clone
.register_update(meta, Some("here"), uuid)
.unwrap();
})
.await
.unwrap();
let txn = update_store.env.read_txn().unwrap();
assert!(update_store
.pending_queue
.get(&txn, &(0, uuid, 0))
.unwrap()
.is_some());
}
#[actix_rt::test]
async fn test_process_update() {
let dir = tempfile::tempdir_in(".").unwrap();
let mut handle = MockIndexActorHandle::new();
handle
.expect_update()
.times(2)
.returning(|_index_uuid, processing, _file| {
if processing.id() == 0 {
Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
} else {
Box::pin(ok(Err(processing.fail(String::from("err")))))
}
});
let handle = Arc::new(handle);
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let store = UpdateStore::open(options, dir.path(), handle.clone()).unwrap();
// wait a bit for the event loop exit.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut txn = store.env.write_txn().unwrap();
let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None);
let uuid = Uuid::new_v4();
store
.pending_queue
.put(&mut txn, &(0, uuid, 0), &update)
.unwrap();
let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None);
store
.pending_queue
.put(&mut txn, &(1, uuid, 1), &update)
.unwrap();
txn.commit().unwrap();
// Process the pending, and check that it has been moved to the update databases, and
// removed from the pending database.
let store_clone = store.clone();
tokio::task::spawn_blocking(move || {
store_clone.process_pending_update(handle.clone()).unwrap();
store_clone.process_pending_update(handle).unwrap();
})
.await
.unwrap();
let txn = store.env.read_txn().unwrap();
assert!(store.pending_queue.first(&txn).unwrap().is_none());
let update = store
.updates
.remap_key_type::<UpdateKeyCodec>()
.get(&txn, &(uuid, 0))
.unwrap()
.unwrap();
assert!(matches!(update, UpdateStatus::Processed(_)));
let update = store
.updates
.remap_key_type::<UpdateKeyCodec>()
.get(&txn, &(uuid, 1))
.unwrap()
.unwrap();
assert!(matches!(update, UpdateStatus::Failed(_)));
}
}

View File

@ -6,9 +6,8 @@ use grenad::CompressionType;
use milli::update::UpdateBuilder;
use rayon::ThreadPool;
use crate::index::UpdateResult;
use crate::index_controller::updates::{Failed, Processed, Processing};
use crate::index_controller::UpdateMeta;
use crate::index_controller::{Failed, Processed, Processing};
use crate::option::IndexerOpts;
pub struct UpdateHandler {
@ -59,10 +58,10 @@ impl UpdateHandler {
pub fn handle_update(
&self,
meta: Processing<UpdateMeta>,
content: File,
meta: Processing,
content: Option<File>,
index: Index,
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
) -> Result<Processed, Failed> {
use UpdateMeta::*;
let update_id = meta.id();

View File

@ -1,94 +1,121 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use std::path::{Path, PathBuf};
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Enqueued<M> {
pub update_id: u64,
pub meta: M,
pub enqueued_at: DateTime<Utc>,
pub index_uuid: Uuid,
use chrono::{DateTime, Utc};
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize};
use crate::index::{Facets, Settings};
pub type UpdateError = String;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult {
DocumentsAddition(DocumentAdditionResult),
DocumentDeletion { deleted: u64 },
Other,
}
impl<M> Enqueued<M> {
pub fn new(meta: M, update_id: u64, index_uuid: Uuid) -> Self {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum UpdateMeta {
DocumentsAddition {
method: IndexDocumentsMethod,
format: UpdateFormat,
primary_key: Option<String>,
},
ClearDocuments,
DeleteDocuments,
Settings(Settings),
Facets(Facets),
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Enqueued {
pub update_id: u64,
pub meta: UpdateMeta,
pub enqueued_at: DateTime<Utc>,
pub content: Option<PathBuf>,
}
impl Enqueued {
pub fn new(meta: UpdateMeta, update_id: u64, content: Option<PathBuf>) -> Self {
Self {
enqueued_at: Utc::now(),
meta,
update_id,
index_uuid,
content,
}
}
pub fn processing(self) -> Processing<M> {
pub fn processing(self) -> Processing {
Processing {
from: self,
started_processing_at: Utc::now(),
}
}
pub fn abort(self) -> Aborted<M> {
pub fn abort(self) -> Aborted {
Aborted {
from: self,
aborted_at: Utc::now(),
}
}
pub fn meta(&self) -> &M {
pub fn meta(&self) -> &UpdateMeta {
&self.meta
}
pub fn id(&self) -> u64 {
self.update_id
}
pub fn content_path(&self) -> Option<&Path> {
self.content.as_deref()
}
}
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Processed<M, N> {
pub success: N,
pub struct Processed {
pub success: UpdateResult,
pub processed_at: DateTime<Utc>,
#[serde(flatten)]
pub from: Processing<M>,
pub from: Processing,
}
impl<M, N> Processed<M, N> {
impl Processed {
pub fn id(&self) -> u64 {
self.from.id()
}
}
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Processing<M> {
pub struct Processing {
#[serde(flatten)]
pub from: Enqueued<M>,
pub from: Enqueued,
pub started_processing_at: DateTime<Utc>,
}
impl<M> Processing<M> {
impl Processing {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn meta(&self) -> &M {
pub fn meta(&self) -> &UpdateMeta {
self.from.meta()
}
pub fn index_uuid(&self) -> &Uuid {
&self.from.index_uuid
}
pub fn process<N>(self, meta: N) -> Processed<M, N> {
pub fn process(self, success: UpdateResult) -> Processed {
Processed {
success: meta,
success,
from: self,
processed_at: Utc::now(),
}
}
pub fn fail<E>(self, error: E) -> Failed<M, E> {
pub fn fail(self, error: UpdateError) -> Failed {
Failed {
from: self,
error,
@ -97,46 +124,46 @@ impl<M> Processing<M> {
}
}
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Aborted<M> {
pub struct Aborted {
#[serde(flatten)]
from: Enqueued<M>,
from: Enqueued,
aborted_at: DateTime<Utc>,
}
impl<M> Aborted<M> {
impl Aborted {
pub fn id(&self) -> u64 {
self.from.id()
}
}
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Failed<M, E> {
pub struct Failed {
#[serde(flatten)]
from: Processing<M>,
error: E,
from: Processing,
error: UpdateError,
failed_at: DateTime<Utc>,
}
impl<M, E> Failed<M, E> {
impl Failed {
pub fn id(&self) -> u64 {
self.from.id()
}
}
#[derive(Debug, PartialEq, Eq, Hash, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "camelCase")]
pub enum UpdateStatus<M, N, E> {
Processing(Processing<M>),
Enqueued(Enqueued<M>),
Processed(Processed<M, N>),
Aborted(Aborted<M>),
Failed(Failed<M, E>),
pub enum UpdateStatus {
Processing(Processing),
Enqueued(Enqueued),
Processed(Processed),
Aborted(Aborted),
Failed(Failed),
}
impl<M, N, E> UpdateStatus<M, N, E> {
impl UpdateStatus {
pub fn id(&self) -> u64 {
match self {
UpdateStatus::Processing(u) => u.id(),
@ -147,7 +174,7 @@ impl<M, N, E> UpdateStatus<M, N, E> {
}
}
pub fn processed(&self) -> Option<&Processed<M, N>> {
pub fn processed(&self) -> Option<&Processed> {
match self {
UpdateStatus::Processed(p) => Some(p),
_ => None,
@ -155,32 +182,32 @@ impl<M, N, E> UpdateStatus<M, N, E> {
}
}
impl<M, N, E> From<Enqueued<M>> for UpdateStatus<M, N, E> {
fn from(other: Enqueued<M>) -> Self {
impl From<Enqueued> for UpdateStatus {
fn from(other: Enqueued) -> Self {
Self::Enqueued(other)
}
}
impl<M, N, E> From<Aborted<M>> for UpdateStatus<M, N, E> {
fn from(other: Aborted<M>) -> Self {
impl From<Aborted> for UpdateStatus {
fn from(other: Aborted) -> Self {
Self::Aborted(other)
}
}
impl<M, N, E> From<Processed<M, N>> for UpdateStatus<M, N, E> {
fn from(other: Processed<M, N>) -> Self {
impl From<Processed> for UpdateStatus {
fn from(other: Processed) -> Self {
Self::Processed(other)
}
}
impl<M, N, E> From<Processing<M>> for UpdateStatus<M, N, E> {
fn from(other: Processing<M>) -> Self {
impl From<Processing> for UpdateStatus {
fn from(other: Processing) -> Self {
Self::Processing(other)
}
}
impl<M, N, E> From<Failed<M, E>> for UpdateStatus<M, N, E> {
fn from(other: Failed<M, E>) -> Self {
impl From<Failed> for UpdateStatus {
fn from(other: Failed) -> Self {
Self::Failed(other)
}
}

View File

@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{collections::HashSet, path::PathBuf};
use log::{info, warn};
use tokio::sync::mpsc;
@ -78,7 +78,7 @@ impl<S: UuidStore> UuidResolverActor<S> {
Ok(result)
}
async fn handle_snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>> {
async fn handle_snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
self.store.snapshot(path).await
}

View File

@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
@ -67,7 +68,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.expect("Uuid resolver actor has been killed")?)
}
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>> {
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::SnapshotRequest { path, ret };
let _ = self.sender.send(msg).await;

View File

@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::path::PathBuf;
use tokio::sync::oneshot;
@ -28,7 +29,7 @@ pub enum UuidResolveMsg {
},
SnapshotRequest {
path: PathBuf,
ret: oneshot::Sender<Result<Vec<Uuid>>>,
ret: oneshot::Sender<Result<HashSet<Uuid>>>,
},
GetSize {
ret: oneshot::Sender<Result<u64>>,

View File

@ -3,6 +3,7 @@ mod handle_impl;
mod message;
mod store;
use std::collections::HashSet;
use std::path::PathBuf;
use thiserror::Error;
@ -29,7 +30,7 @@ pub trait UuidResolverHandle {
async fn create(&self, name: String) -> anyhow::Result<Uuid>;
async fn delete(&self, name: String) -> anyhow::Result<Uuid>;
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
}

View File

@ -1,5 +1,6 @@
use std::fs::create_dir_all;
use std::path::{Path, PathBuf};
use std::collections::HashSet;
use std::fs::create_dir_all;
use heed::{
types::{ByteSlice, Str},
@ -19,7 +20,7 @@ pub trait UuidStore {
async fn delete(&self, uid: String) -> Result<Option<Uuid>>;
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
}
@ -129,17 +130,17 @@ impl UuidStore for HeedUuidStore {
.await?
}
async fn snapshot(&self, mut path: PathBuf) -> Result<Vec<Uuid>> {
async fn snapshot(&self, mut path: PathBuf) -> Result<HashSet<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = Vec::new();
let mut entries = HashSet::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push(uuid)
entries.insert(uuid);
}
// only perform snapshot if there are indexes

View File

@ -84,9 +84,9 @@ async fn delete_document(
.delete_documents(path.index_uid.clone(), vec![path.document_id.clone()])
.await
{
Ok(update_status) => {
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
}
Ok(update_status) => Ok(
HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))
),
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}
@ -107,14 +107,11 @@ async fn get_all_documents(
path: web::Path<IndexParam>,
params: web::Query<BrowseQuery>,
) -> Result<HttpResponse, ResponseError> {
let attributes_to_retrieve = params
.attributes_to_retrieve
.as_ref()
.and_then(|attrs| {
let attributes_to_retrieve = params.attributes_to_retrieve.as_ref().and_then(|attrs| {
let mut names = Vec::new();
for name in attrs.split(',').map(String::from) {
if name == "*" {
return None
return None;
}
names.push(name);
}
@ -163,9 +160,9 @@ async fn add_documents(
.await;
match addition_result {
Ok(update_status) => {
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
}
Ok(update_status) => Ok(
HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))
),
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}
@ -242,9 +239,9 @@ async fn delete_documents(
.collect();
match data.delete_documents(path.index_uid.clone(), ids).await {
Ok(update_status) => {
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
}
Ok(update_status) => Ok(
HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))
),
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}
@ -258,9 +255,9 @@ async fn clear_all_documents(
path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> {
match data.clear_documents(path.index_uid.clone()).await {
Ok(update_status) => {
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
}
Ok(update_status) => Ok(
HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))
),
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}

View File

@ -144,9 +144,9 @@ async fn update_all(
.update_settings(index_uid.into_inner(), body.into_inner(), true)
.await
{
Ok(update_result) => {
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() })))
}
Ok(update_result) => Ok(
HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() }))
),
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}
@ -176,9 +176,9 @@ async fn delete_all(
.update_settings(index_uid.into_inner(), settings, false)
.await
{
Ok(update_result) => {
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() })))
}
Ok(update_result) => Ok(
HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() }))
),
Err(e) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() })))
}

View File

@ -1,15 +1,10 @@
use std::collections::BTreeMap;
use actix_web::get;
use actix_web::web;
use actix_web::HttpResponse;
use chrono::{DateTime, Utc};
use serde::Serialize;
use crate::data::Stats;
use crate::error::ResponseError;
use crate::helpers::Authentication;
use crate::index_controller::IndexStats;
use crate::routes::IndexParam;
use crate::Data;
@ -19,59 +14,19 @@ pub fn services(cfg: &mut web::ServiceConfig) {
.service(get_version);
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct IndexStatsResponse {
number_of_documents: u64,
is_indexing: bool,
fields_distribution: BTreeMap<String, u64>,
}
impl From<IndexStats> for IndexStatsResponse {
fn from(stats: IndexStats) -> Self {
Self {
number_of_documents: stats.number_of_documents,
is_indexing: stats.is_indexing,
fields_distribution: stats.fields_distribution.into_iter().collect(),
}
}
}
#[get("/indexes/{index_uid}/stats", wrap = "Authentication::Private")]
async fn get_index_stats(
data: web::Data<Data>,
path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> {
let response: IndexStatsResponse = data.get_index_stats(path.index_uid.clone()).await?.into();
let response = data.get_index_stats(path.index_uid.clone()).await?;
Ok(HttpResponse::Ok().json(response))
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct StatsResponse {
database_size: u64,
last_update: Option<DateTime<Utc>>,
indexes: BTreeMap<String, IndexStatsResponse>,
}
impl From<Stats> for StatsResponse {
fn from(stats: Stats) -> Self {
Self {
database_size: stats.database_size,
last_update: stats.last_update,
indexes: stats
.indexes
.into_iter()
.map(|(uid, index_stats)| (uid, index_stats.into()))
.collect(),
}
}
}
#[get("/stats", wrap = "Authentication::Private")]
async fn get_stats(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
let response: StatsResponse = data.get_stats().await?.into();
let response = data.get_all_stats().await?;
Ok(HttpResponse::Ok().json(response))
}

View File

@ -185,12 +185,9 @@ impl Index<'_> {
self.service.get(url).await
}
make_settings_test_routes!(
distinct_attribute
);
make_settings_test_routes!(distinct_attribute);
}
pub struct GetDocumentOptions;
#[derive(Debug, Default)]

View File

@ -77,8 +77,8 @@ async fn document_addition_with_primary_key() {
"content": "foo",
}
]);
let (_response, code) = index.add_documents(documents, Some("primary")).await;
assert_eq!(code, 202);
let (response, code) = index.add_documents(documents, Some("primary")).await;
assert_eq!(code, 202, "response: {}", response);
index.wait_update_id(0).await;
@ -189,8 +189,8 @@ async fn replace_document() {
}
]);
let (_response, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202);
let (response, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202, "response: {}", response);
index.wait_update_id(0).await;
@ -260,8 +260,8 @@ async fn update_document() {
}
]);
let (_response, code) = index.update_documents(documents, None).await;
assert_eq!(code, 202);
let (response, code) = index.update_documents(documents, None).await;
assert_eq!(code, 202, "response: {}", response);
index.wait_update_id(1).await;

View File

@ -6,14 +6,18 @@ async fn set_and_reset_distinct_attribute() {
let server = Server::new().await;
let index = server.index("test");
let (_response, _code) = index.update_settings(json!({ "distinctAttribute": "test"})).await;
let (_response, _code) = index
.update_settings(json!({ "distinctAttribute": "test"}))
.await;
index.wait_update_id(0).await;
let (response, _) = index.settings().await;
assert_eq!(response["distinctAttribute"], "test");
index.update_settings(json!({ "distinctAttribute": null })).await;
index
.update_settings(json!({ "distinctAttribute": null }))
.await;
index.wait_update_id(1).await;

View File

@ -23,13 +23,7 @@ async fn get_settings() {
assert_eq!(settings["distinctAttribute"], json!(null));
assert_eq!(
settings["rankingRules"],
json!([
"words",
"typo",
"proximity",
"attribute",
"exactness"
])
json!(["words", "typo", "proximity", "attribute", "exactness"])
);
assert_eq!(settings["stopWords"], json!([]));
}

View File

@ -1,2 +1,2 @@
mod get_settings;
mod distinct;
mod get_settings;