clippy + fmt

This commit is contained in:
mpostma 2021-09-28 22:22:59 +02:00
parent 5fa9bc67d7
commit 102c46f88b
36 changed files with 596 additions and 473 deletions

View file

@ -1,9 +1,12 @@
use std::fmt;
use std::error::Error;
use std::fmt;
use meilisearch_error::{Code, ErrorCode};
use crate::{document_formats::DocumentFormatError, index_controller::update_file_store::UpdateFileStoreError};
use crate::{
document_formats::DocumentFormatError,
index_controller::update_file_store::UpdateFileStoreError,
};
pub type Result<T> = std::result::Result<T, UpdateLoopError>;
@ -28,7 +31,8 @@ pub enum UpdateLoopError {
}
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError
where T: Sync + Send + 'static + fmt::Debug
where
T: Sync + Send + 'static + fmt::Debug,
{
fn from(other: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::Internal(Box::new(other))

View file

@ -44,7 +44,11 @@ pub enum UpdateMsg {
}
impl UpdateMsg {
pub async fn snapshot(sender: &mpsc::Sender<Self>, path: PathBuf, indexes: Vec<Index>) -> Result<()> {
pub async fn snapshot(
sender: &mpsc::Sender<Self>,
path: PathBuf,
indexes: Vec<Index>,
) -> Result<()> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Snapshot { path, indexes, ret };
sender.send(msg).await?;

View file

@ -80,7 +80,7 @@ impl<S: Stream<Item = std::result::Result<Bytes, PayloadError>> + Unpin> io::Rea
self.read(buf)
}
Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)),
None => return Ok(0),
None => Ok(0),
},
}
}
@ -109,7 +109,13 @@ impl UpdateLoop {
let must_exit = Arc::new(AtomicBool::new(false));
let update_file_store = UpdateFileStore::new(&path).unwrap();
let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone(), update_file_store.clone())?;
let store = UpdateStore::open(
options,
&path,
index_resolver,
must_exit.clone(),
update_file_store.clone(),
)?;
let inbox = Some(inbox);
@ -194,8 +200,8 @@ impl UpdateLoop {
update_file.persist()?;
Ok(())
}).await??;
})
.await??;
store::Update::DocumentAddition {
primary_key,
@ -216,7 +222,6 @@ impl UpdateLoop {
Ok(status.into())
}
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || {
@ -248,8 +253,7 @@ impl UpdateLoop {
async fn handle_snapshot(&self, indexes: Vec<Index>, path: PathBuf) -> Result<()> {
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || update_store.snapshot(indexes, path))
.await??;
tokio::task::spawn_blocking(move || update_store.snapshot(indexes, path)).await??;
Ok(())
}

View file

@ -6,7 +6,10 @@ use meilisearch_error::{Code, ErrorCode};
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
use serde::{Deserialize, Serialize};
use crate::{Update, index::{Settings, Unchecked}};
use crate::{
index::{Settings, Unchecked},
Update,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult {
@ -160,7 +163,7 @@ impl Display for Failed {
}
}
impl Error for Failed { }
impl Error for Failed {}
impl ErrorCode for Failed {
fn error_code(&self) -> Code {

View file

@ -1,7 +1,7 @@
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::fs::{create_dir_all, File};
use std::io::{BufReader, Write};
use std::fs::{File, create_dir_all};
use std::path::{Path, PathBuf};
use heed::{EnvOpenOptions, RoTxn};
use rayon::prelude::*;
@ -11,7 +11,14 @@ use tempfile::{NamedTempFile, TempDir};
use uuid::Uuid;
use super::{Result, State, UpdateStore};
use crate::{Update, index::Index, index_controller::{update_file_store::UpdateFileStore, updates::status::{Enqueued, UpdateStatus}}};
use crate::{
index::Index,
index_controller::{
update_file_store::UpdateFileStore,
updates::status::{Enqueued, UpdateStatus},
},
Update,
};
#[derive(Serialize, Deserialize)]
struct UpdateEntry {
@ -20,11 +27,7 @@ struct UpdateEntry {
}
impl UpdateStore {
pub fn dump(
&self,
indexes: &[Index],
path: PathBuf,
) -> Result<()> {
pub fn dump(&self, indexes: &[Index], path: PathBuf) -> Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Dumping);
@ -35,7 +38,10 @@ impl UpdateStore {
self.dump_updates(&txn, &uuids, &path)?;
indexes.par_iter().try_for_each(|index| index.dump(&path)).unwrap();
indexes
.par_iter()
.try_for_each(|index| index.dump(&path))
.unwrap();
Ok(())
}
@ -74,11 +80,13 @@ impl UpdateStore {
let update = data.decode()?;
if let Enqueued {
meta: Update::DocumentAddition {
content_uuid, ..
}, ..
} = update {
self.update_file_store.dump(content_uuid, &dst_path).unwrap();
meta: Update::DocumentAddition { content_uuid, .. },
..
} = update
{
self.update_file_store
.dump(content_uuid, &dst_path)
.unwrap();
}
let update_json = UpdateEntry {
@ -122,7 +130,6 @@ impl UpdateStore {
dst: impl AsRef<Path>,
db_size: usize,
) -> anyhow::Result<()> {
println!("target path: {}", dst.as_ref().display());
let mut options = EnvOpenOptions::new();

View file

@ -17,25 +17,26 @@ use heed::zerocopy::U64;
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use log::error;
use parking_lot::{Mutex, MutexGuard};
use rayon::prelude::*;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::timeout;
use uuid::Uuid;
use rayon::prelude::*;
use codec::*;
use super::error::Result;
use super::status::{Enqueued, Processing};
use crate::EnvSizer;
use crate::index::Index;
use crate::index_controller::update_files_path;
use crate::index_controller::updates::*;
use crate::index::Index;
use crate::EnvSizer;
#[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>;
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Update {
DeleteDocuments(Vec<String>),
@ -164,7 +165,8 @@ impl UpdateStore {
must_exit: Arc<AtomicBool>,
update_file_store: UpdateFileStore,
) -> anyhow::Result<Arc<Self>> {
let (update_store, mut notification_receiver) = Self::new(options, path, update_file_store)?;
let (update_store, mut notification_receiver) =
Self::new(options, path, update_file_store)?;
let update_store = Arc::new(update_store);
// Send a first notification to trigger the process.
@ -250,11 +252,7 @@ impl UpdateStore {
/// Registers the update content in the pending store and the meta
/// into the pending-meta store. Returns the new unique update id.
pub fn register_update(
&self,
index_uuid: Uuid,
update: Update,
) -> heed::Result<Enqueued> {
pub fn register_update(&self, index_uuid: Uuid, update: Update) -> heed::Result<Enqueued> {
let mut txn = self.env.write_txn()?;
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
let meta = Enqueued::new(update, update_id);
@ -299,7 +297,10 @@ impl UpdateStore {
/// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update(&self, index_resolver: Arc<HardStateIndexResolver>) -> Result<Option<()>> {
fn process_pending_update(
&self,
index_resolver: Arc<HardStateIndexResolver>,
) -> Result<Option<()>> {
// Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?;
let first_meta = self.pending_queue.first(&rtxn)?;
@ -315,8 +316,7 @@ impl UpdateStore {
let state = self.state.write();
state.swap(State::Processing(index_uuid, processing.clone()));
let result =
self.perform_update(processing, index_resolver, index_uuid, global_id);
let result = self.perform_update(processing, index_resolver, index_uuid, global_id);
state.swap(State::Idle);
@ -444,7 +444,7 @@ impl UpdateStore {
if uuid == index_uuid {
let mut _pending = pending.decode()?;
//if let Some(update_uuid) = pending.content.take() {
//uuids_to_remove.push(update_uuid);
//uuids_to_remove.push(update_uuid);
//}
// Invariant check: we can only delete the current entry when we don't hold
@ -495,15 +495,10 @@ impl UpdateStore {
Ok(())
}
pub fn snapshot(
&self,
indexes: Vec<Index>,
path: impl AsRef<Path>,
) -> Result<()> {
pub fn snapshot(&self, indexes: Vec<Index>, path: impl AsRef<Path>) -> Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Snapshoting);
let txn = self.env.write_txn()?;
let update_path = path.as_ref().join("updates");
@ -523,19 +518,22 @@ impl UpdateStore {
let ((_, uuid, _), pending) = entry?;
if uuids.contains(&uuid) {
if let Enqueued {
meta: Update::DocumentAddition {
content_uuid, ..
},
meta: Update::DocumentAddition { content_uuid, .. },
..
} = pending.decode()?
{
self.update_file_store.snapshot(content_uuid, &path).unwrap();
self.update_file_store
.snapshot(content_uuid, &path)
.unwrap();
}
}
}
let path = path.as_ref().to_owned();
indexes.par_iter().try_for_each(|index| index.snapshot(path.clone())).unwrap();
indexes
.par_iter()
.try_for_each(|index| index.snapshot(path.clone()))
.unwrap();
Ok(())
}
@ -546,10 +544,7 @@ impl UpdateStore {
for entry in self.pending_queue.iter(&txn)? {
let (_, pending) = entry?;
if let Enqueued {
meta: store::Update::DocumentAddition {
content_uuid,
..
},
meta: store::Update::DocumentAddition { content_uuid, .. },
..
} = pending
{
@ -568,147 +563,147 @@ impl UpdateStore {
//#[cfg(test)]
//mod test {
//use super::*;
//use crate::index_controller::{
//index_actor::{error::IndexActorError, MockIndexActorHandle},
//UpdateResult,
//};
//use super::*;
//use crate::index_controller::{
//index_actor::{error::IndexActorError, MockIndexActorHandle},
//UpdateResult,
//};
//use futures::future::ok;
//use futures::future::ok;
//#[actix_rt::test]
//async fn test_next_id() {
//let dir = tempfile::tempdir_in(".").unwrap();
//let mut options = EnvOpenOptions::new();
//let handle = Arc::new(MockIndexActorHandle::new());
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir.path(),
//handle,
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//#[actix_rt::test]
//async fn test_next_id() {
//let dir = tempfile::tempdir_in(".").unwrap();
//let mut options = EnvOpenOptions::new();
//let handle = Arc::new(MockIndexActorHandle::new());
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir.path(),
//handle,
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//let index1_uuid = Uuid::new_v4();
//let index2_uuid = Uuid::new_v4();
//let index1_uuid = Uuid::new_v4();
//let index2_uuid = Uuid::new_v4();
//let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
//txn.commit().unwrap();
//assert_eq!((0, 0), ids);
//let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
//txn.commit().unwrap();
//assert_eq!((0, 0), ids);
//let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
//txn.commit().unwrap();
//assert_eq!((1, 0), ids);
//let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
//txn.commit().unwrap();
//assert_eq!((1, 0), ids);
//let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
//txn.commit().unwrap();
//assert_eq!((2, 1), ids);
//}
//#[actix_rt::test]
//async fn test_register_update() {
//let dir = tempfile::tempdir_in(".").unwrap();
//let mut options = EnvOpenOptions::new();
//let handle = Arc::new(MockIndexActorHandle::new());
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir.path(),
//handle,
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//let meta = UpdateMeta::ClearDocuments;
//let uuid = Uuid::new_v4();
//let store_clone = update_store.clone();
//tokio::task::spawn_blocking(move || {
//store_clone.register_update(meta, None, uuid).unwrap();
//})
//.await
//.unwrap();
//let txn = update_store.env.read_txn().unwrap();
//assert!(update_store
//.pending_queue
//.get(&txn, &(0, uuid, 0))
//.unwrap()
//.is_some());
//}
//#[actix_rt::test]
//async fn test_process_update() {
//let dir = tempfile::tempdir_in(".").unwrap();
//let mut handle = MockIndexActorHandle::new();
//handle
//.expect_update()
//.times(2)
//.returning(|_index_uuid, processing, _file| {
//if processing.id() == 0 {
//Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
//} else {
//Box::pin(ok(Err(
//processing.fail(IndexActorError::ExistingPrimaryKey.into())
//)))
//}
//});
//let handle = Arc::new(handle);
//let mut options = EnvOpenOptions::new();
//options.map_size(4096 * 100);
//let store = UpdateStore::open(
//options,
//dir.path(),
//handle.clone(),
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//// wait a bit for the event loop exit.
//tokio::time::sleep(std::time::Duration::from_millis(50)).await;
//let mut txn = store.env.write_txn().unwrap();
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None);
//let uuid = Uuid::new_v4();
//store
//.pending_queue
//.put(&mut txn, &(0, uuid, 0), &update)
//.unwrap();
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None);
//store
//.pending_queue
//.put(&mut txn, &(1, uuid, 1), &update)
//.unwrap();
//txn.commit().unwrap();
//// Process the pending, and check that it has been moved to the update databases, and
//// removed from the pending database.
//let store_clone = store.clone();
//tokio::task::spawn_blocking(move || {
//store_clone.process_pending_update(handle.clone()).unwrap();
//store_clone.process_pending_update(handle).unwrap();
//})
//.await
//.unwrap();
//let txn = store.env.read_txn().unwrap();
//assert!(store.pending_queue.first(&txn).unwrap().is_none());
//let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap();
//assert!(matches!(update, UpdateStatus::Processed(_)));
//let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap();
//assert!(matches!(update, UpdateStatus::Failed(_)));
//}
//let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
//txn.commit().unwrap();
//assert_eq!((2, 1), ids);
//}
//#[actix_rt::test]
//async fn test_register_update() {
//let dir = tempfile::tempdir_in(".").unwrap();
//let mut options = EnvOpenOptions::new();
//let handle = Arc::new(MockIndexActorHandle::new());
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir.path(),
//handle,
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//let meta = UpdateMeta::ClearDocuments;
//let uuid = Uuid::new_v4();
//let store_clone = update_store.clone();
//tokio::task::spawn_blocking(move || {
//store_clone.register_update(meta, None, uuid).unwrap();
//})
//.await
//.unwrap();
//let txn = update_store.env.read_txn().unwrap();
//assert!(update_store
//.pending_queue
//.get(&txn, &(0, uuid, 0))
//.unwrap()
//.is_some());
//}
//#[actix_rt::test]
//async fn test_process_update() {
//let dir = tempfile::tempdir_in(".").unwrap();
//let mut handle = MockIndexActorHandle::new();
//handle
//.expect_update()
//.times(2)
//.returning(|_index_uuid, processing, _file| {
//if processing.id() == 0 {
//Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
//} else {
//Box::pin(ok(Err(
//processing.fail(IndexActorError::ExistingPrimaryKey.into())
//)))
//}
//});
//let handle = Arc::new(handle);
//let mut options = EnvOpenOptions::new();
//options.map_size(4096 * 100);
//let store = UpdateStore::open(
//options,
//dir.path(),
//handle.clone(),
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//// wait a bit for the event loop exit.
//tokio::time::sleep(std::time::Duration::from_millis(50)).await;
//let mut txn = store.env.write_txn().unwrap();
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None);
//let uuid = Uuid::new_v4();
//store
//.pending_queue
//.put(&mut txn, &(0, uuid, 0), &update)
//.unwrap();
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None);
//store
//.pending_queue
//.put(&mut txn, &(1, uuid, 1), &update)
//.unwrap();
//txn.commit().unwrap();
//// Process the pending, and check that it has been moved to the update databases, and
//// removed from the pending database.
//let store_clone = store.clone();
//tokio::task::spawn_blocking(move || {
//store_clone.process_pending_update(handle.clone()).unwrap();
//store_clone.process_pending_update(handle).unwrap();
//})
//.await
//.unwrap();
//let txn = store.env.read_txn().unwrap();
//assert!(store.pending_queue.first(&txn).unwrap().is_none());
//let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap();
//assert!(matches!(update, UpdateStatus::Processed(_)));
//let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap();
//assert!(matches!(update, UpdateStatus::Failed(_)));
//}
//}