mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-23 19:57:30 +01:00
Merge #2453
2453: test index resolver r=MarinPostma a=MarinPostma add some tests to the `IndexResolver` implementation of `BatchHandler` Co-authored-by: ad hoc <postma.marin@protonmail.com>
This commit is contained in:
commit
0928f3d41c
@ -121,7 +121,6 @@ mod real {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::marker::PhantomData;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -137,12 +136,12 @@ mod test {
|
||||
|
||||
pub enum MockDumpHandler<U, I> {
|
||||
Real(super::real::DumpHandler<U, I>),
|
||||
Mock(Mocker, PhantomData<(U, I)>),
|
||||
Mock(Mocker),
|
||||
}
|
||||
|
||||
impl<U, I> MockDumpHandler<U, I> {
|
||||
pub fn mock(mocker: Mocker) -> Self {
|
||||
Self::Mock(mocker, PhantomData)
|
||||
Self::Mock(mocker)
|
||||
}
|
||||
}
|
||||
|
||||
@ -173,7 +172,7 @@ mod test {
|
||||
pub async fn run(&self, uid: String) -> Result<()> {
|
||||
match self {
|
||||
DumpHandler::Real(real) => real.run(uid).await,
|
||||
DumpHandler::Mock(mocker, _) => unsafe { mocker.get("run").call(uid) },
|
||||
DumpHandler::Mock(mocker) => unsafe { mocker.get("run").call(uid) },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use tokio::sync::mpsc::error::SendError as MpscSendError;
|
||||
use tokio::sync::oneshot::error::RecvError as OneshotRecvError;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{error::MilliError, index::error::IndexError};
|
||||
use crate::{error::MilliError, index::error::IndexError, update_file_store::UpdateFileStoreError};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, IndexResolverError>;
|
||||
|
||||
@ -49,7 +49,8 @@ internal_error!(
|
||||
uuid::Error,
|
||||
std::io::Error,
|
||||
tokio::task::JoinError,
|
||||
serde_json::Error
|
||||
serde_json::Error,
|
||||
UpdateFileStoreError
|
||||
);
|
||||
|
||||
impl ErrorCode for IndexResolverError {
|
||||
|
@ -27,6 +27,12 @@ use self::meta_store::IndexMeta;
|
||||
|
||||
pub type HardStateIndexResolver = IndexResolver<HeedMetaStore, MapIndexStore>;
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub use real::IndexResolver;
|
||||
|
||||
#[cfg(test)]
|
||||
pub use test::MockIndexResolver as IndexResolver;
|
||||
|
||||
/// An index uid is composed of only ascii alphanumeric characters, - and _, between 1 and 400
|
||||
/// bytes long
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
@ -96,349 +102,448 @@ impl FromStr for IndexUid {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IndexResolver<U, I> {
|
||||
index_uuid_store: U,
|
||||
index_store: I,
|
||||
pub file_store: UpdateFileStore,
|
||||
}
|
||||
mod real {
|
||||
use super::*;
|
||||
|
||||
impl IndexResolver<HeedMetaStore, MapIndexStore> {
|
||||
pub fn load_dump(
|
||||
src: impl AsRef<Path>,
|
||||
dst: impl AsRef<Path>,
|
||||
index_db_size: usize,
|
||||
env: Arc<Env>,
|
||||
indexer_opts: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
HeedMetaStore::load_dump(&src, env)?;
|
||||
let indexes_path = src.as_ref().join("indexes");
|
||||
let indexes = indexes_path.read_dir()?;
|
||||
let indexer_config = IndexerConfig::try_from(indexer_opts)?;
|
||||
for index in indexes {
|
||||
Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
pub struct IndexResolver<U, I> {
|
||||
pub(super) index_uuid_store: U,
|
||||
pub(super) index_store: I,
|
||||
pub(super) file_store: UpdateFileStore,
|
||||
}
|
||||
}
|
||||
|
||||
impl<U, I> IndexResolver<U, I>
|
||||
where
|
||||
U: IndexMetaStore,
|
||||
I: IndexStore,
|
||||
{
|
||||
pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self {
|
||||
Self {
|
||||
index_uuid_store,
|
||||
index_store,
|
||||
file_store,
|
||||
impl IndexResolver<HeedMetaStore, MapIndexStore> {
|
||||
pub fn load_dump(
|
||||
src: impl AsRef<Path>,
|
||||
dst: impl AsRef<Path>,
|
||||
index_db_size: usize,
|
||||
env: Arc<Env>,
|
||||
indexer_opts: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
HeedMetaStore::load_dump(&src, env)?;
|
||||
let indexes_path = src.as_ref().join("indexes");
|
||||
let indexes = indexes_path.read_dir()?;
|
||||
let indexer_config = IndexerConfig::try_from(indexer_opts)?;
|
||||
for index in indexes {
|
||||
Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_document_addition_batch(&self, mut tasks: Vec<Task>) -> Vec<Task> {
|
||||
fn get_content_uuid(task: &Task) -> Uuid {
|
||||
match task {
|
||||
Task {
|
||||
content: TaskContent::DocumentAddition { content_uuid, .. },
|
||||
..
|
||||
} => *content_uuid,
|
||||
_ => panic!("unexpected task in the document addition batch"),
|
||||
impl<U, I> IndexResolver<U, I>
|
||||
where
|
||||
U: IndexMetaStore,
|
||||
I: IndexStore,
|
||||
{
|
||||
pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self {
|
||||
Self {
|
||||
index_uuid_store,
|
||||
index_store,
|
||||
file_store,
|
||||
}
|
||||
}
|
||||
|
||||
let content_uuids = tasks.iter().map(get_content_uuid).collect::<Vec<_>>();
|
||||
|
||||
match tasks.first() {
|
||||
Some(Task {
|
||||
id,
|
||||
content:
|
||||
TaskContent::DocumentAddition {
|
||||
merge_strategy,
|
||||
primary_key,
|
||||
allow_index_creation,
|
||||
index_uid,
|
||||
pub async fn process_document_addition_batch(&self, mut tasks: Vec<Task>) -> Vec<Task> {
|
||||
fn get_content_uuid(task: &Task) -> Uuid {
|
||||
match task {
|
||||
Task {
|
||||
content: TaskContent::DocumentAddition { content_uuid, .. },
|
||||
..
|
||||
},
|
||||
..
|
||||
}) => {
|
||||
let primary_key = primary_key.clone();
|
||||
let method = *merge_strategy;
|
||||
} => *content_uuid,
|
||||
_ => panic!("unexpected task in the document addition batch"),
|
||||
}
|
||||
}
|
||||
|
||||
let index = if *allow_index_creation {
|
||||
self.get_or_create_index(index_uid.clone(), *id).await
|
||||
} else {
|
||||
self.get_index(index_uid.as_str().to_string()).await
|
||||
};
|
||||
let content_uuids = tasks.iter().map(get_content_uuid).collect::<Vec<_>>();
|
||||
|
||||
// If the index doesn't exist and we are not allowed to create it with the first
|
||||
// task, we must fails the whole batch.
|
||||
let now = OffsetDateTime::now_utc();
|
||||
let index = match index {
|
||||
Ok(index) => index,
|
||||
Err(e) => {
|
||||
let error = ResponseError::from(e);
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(TaskEvent::Failed {
|
||||
error: error.clone(),
|
||||
timestamp: now,
|
||||
});
|
||||
}
|
||||
return tasks;
|
||||
}
|
||||
};
|
||||
|
||||
let file_store = self.file_store.clone();
|
||||
let result = spawn_blocking(move || {
|
||||
index.update_documents(
|
||||
method,
|
||||
primary_key,
|
||||
file_store,
|
||||
content_uuids.into_iter(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let event = match result {
|
||||
Ok(Ok(result)) => TaskEvent::Succeeded {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
result: TaskResult::DocumentAddition {
|
||||
indexed_documents: result.indexed_documents,
|
||||
match tasks.first() {
|
||||
Some(Task {
|
||||
id,
|
||||
content:
|
||||
TaskContent::DocumentAddition {
|
||||
merge_strategy,
|
||||
primary_key,
|
||||
allow_index_creation,
|
||||
index_uid,
|
||||
..
|
||||
},
|
||||
},
|
||||
Ok(Err(e)) => TaskEvent::Failed {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
error: e.into(),
|
||||
},
|
||||
Err(e) => TaskEvent::Failed {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
error: IndexResolverError::from(e).into(),
|
||||
},
|
||||
};
|
||||
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(event.clone());
|
||||
}
|
||||
|
||||
tasks
|
||||
}
|
||||
_ => panic!("invalid batch!"),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
|
||||
match &task.content {
|
||||
TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"),
|
||||
TaskContent::DocumentDeletion {
|
||||
deletion: DocumentDeletion::Ids(ids),
|
||||
index_uid,
|
||||
} => {
|
||||
let ids = ids.clone();
|
||||
let index = self.get_index(index_uid.clone().into_inner()).await?;
|
||||
|
||||
let DocumentDeletionResult {
|
||||
deleted_documents, ..
|
||||
} = spawn_blocking(move || index.delete_documents(&ids)).await??;
|
||||
|
||||
Ok(TaskResult::DocumentDeletion { deleted_documents })
|
||||
}
|
||||
TaskContent::DocumentDeletion {
|
||||
deletion: DocumentDeletion::Clear,
|
||||
index_uid,
|
||||
} => {
|
||||
let index = self.get_index(index_uid.clone().into_inner()).await?;
|
||||
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
|
||||
let number_documents = index.stats()?.number_of_documents;
|
||||
index.clear_documents()?;
|
||||
Ok(number_documents)
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(TaskResult::ClearAll { deleted_documents })
|
||||
}
|
||||
TaskContent::SettingsUpdate {
|
||||
settings,
|
||||
is_deletion,
|
||||
allow_index_creation,
|
||||
index_uid,
|
||||
} => {
|
||||
let index = if *is_deletion || !*allow_index_creation {
|
||||
self.get_index(index_uid.clone().into_inner()).await?
|
||||
} else {
|
||||
self.get_or_create_index(index_uid.clone(), task.id).await?
|
||||
};
|
||||
|
||||
let settings = settings.clone();
|
||||
spawn_blocking(move || index.update_settings(&settings.check())).await??;
|
||||
|
||||
Ok(TaskResult::Other)
|
||||
}
|
||||
TaskContent::IndexDeletion { index_uid } => {
|
||||
let index = self.delete_index(index_uid.clone().into_inner()).await?;
|
||||
|
||||
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
|
||||
Ok(index.stats()?.number_of_documents)
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(TaskResult::ClearAll { deleted_documents })
|
||||
}
|
||||
TaskContent::IndexCreation {
|
||||
primary_key,
|
||||
index_uid,
|
||||
} => {
|
||||
let index = self.create_index(index_uid.clone(), task.id).await?;
|
||||
|
||||
if let Some(primary_key) = primary_key {
|
||||
..
|
||||
}) => {
|
||||
let primary_key = primary_key.clone();
|
||||
spawn_blocking(move || index.update_primary_key(primary_key)).await??;
|
||||
}
|
||||
let method = *merge_strategy;
|
||||
|
||||
Ok(TaskResult::Other)
|
||||
}
|
||||
TaskContent::IndexUpdate {
|
||||
primary_key,
|
||||
index_uid,
|
||||
} => {
|
||||
let index = self.get_index(index_uid.clone().into_inner()).await?;
|
||||
let index = if *allow_index_creation {
|
||||
self.get_or_create_index(index_uid.clone(), *id).await
|
||||
} else {
|
||||
self.get_index(index_uid.as_str().to_string()).await
|
||||
};
|
||||
|
||||
if let Some(primary_key) = primary_key {
|
||||
let primary_key = primary_key.clone();
|
||||
spawn_blocking(move || index.update_primary_key(primary_key)).await??;
|
||||
}
|
||||
|
||||
Ok(TaskResult::Other)
|
||||
}
|
||||
_ => unreachable!("Invalid task for index resolver"),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||
for (_, index) in self.list().await? {
|
||||
index.dump(&path)?;
|
||||
}
|
||||
self.index_uuid_store.dump(path.as_ref().to_owned()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result<Index> {
|
||||
match self.index_uuid_store.get(uid.into_inner()).await? {
|
||||
(uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)),
|
||||
(uid, None) => {
|
||||
let uuid = Uuid::new_v4();
|
||||
let index = self.index_store.create(uuid).await?;
|
||||
match self
|
||||
.index_uuid_store
|
||||
.insert(
|
||||
uid,
|
||||
IndexMeta {
|
||||
uuid,
|
||||
creation_task_id,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
match self.index_store.delete(uuid).await {
|
||||
Ok(Some(index)) => {
|
||||
index.close();
|
||||
// If the index doesn't exist and we are not allowed to create it with the first
|
||||
// task, we must fails the whole batch.
|
||||
let now = OffsetDateTime::now_utc();
|
||||
let index = match index {
|
||||
Ok(index) => index,
|
||||
Err(e) => {
|
||||
let error = ResponseError::from(e);
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(TaskEvent::Failed {
|
||||
error: error.clone(),
|
||||
timestamp: now,
|
||||
});
|
||||
}
|
||||
Ok(None) => (),
|
||||
Err(e) => log::error!("Error while deleting index: {:?}", e),
|
||||
return tasks;
|
||||
}
|
||||
Err(e)
|
||||
};
|
||||
|
||||
let file_store = self.file_store.clone();
|
||||
let result = spawn_blocking(move || {
|
||||
index.update_documents(
|
||||
method,
|
||||
primary_key,
|
||||
file_store,
|
||||
content_uuids.into_iter(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let event = match result {
|
||||
Ok(Ok(result)) => TaskEvent::Succeeded {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
result: TaskResult::DocumentAddition {
|
||||
indexed_documents: result.indexed_documents,
|
||||
},
|
||||
},
|
||||
Ok(Err(e)) => TaskEvent::Failed {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
error: e.into(),
|
||||
},
|
||||
Err(e) => TaskEvent::Failed {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
error: IndexResolverError::from(e).into(),
|
||||
},
|
||||
};
|
||||
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(event.clone());
|
||||
}
|
||||
Ok(()) => Ok(index),
|
||||
|
||||
tasks
|
||||
}
|
||||
_ => panic!("invalid batch!"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create an index with name `uid`.
|
||||
pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result<Index> {
|
||||
match self.create_index(uid, task_id).await {
|
||||
Ok(index) => Ok(index),
|
||||
Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await,
|
||||
Err(e) => Err(e),
|
||||
pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> {
|
||||
self.file_store.delete(content_uuid).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<Vec<(String, Index)>> {
|
||||
let uuids = self.index_uuid_store.list().await?;
|
||||
let mut indexes = Vec::new();
|
||||
for (name, IndexMeta { uuid, .. }) in uuids {
|
||||
match self.index_store.get(uuid).await? {
|
||||
Some(index) => indexes.push((name, index)),
|
||||
None => {
|
||||
// we found an unexisting index, we remove it from the uuid store
|
||||
let _ = self.index_uuid_store.delete(name).await;
|
||||
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
|
||||
match &task.content {
|
||||
TaskContent::DocumentAddition { .. } => {
|
||||
panic!("updates should be handled by batch")
|
||||
}
|
||||
TaskContent::DocumentDeletion {
|
||||
deletion: DocumentDeletion::Ids(ids),
|
||||
index_uid,
|
||||
} => {
|
||||
let ids = ids.clone();
|
||||
let index = self.get_index(index_uid.clone().into_inner()).await?;
|
||||
|
||||
let DocumentDeletionResult {
|
||||
deleted_documents, ..
|
||||
} = spawn_blocking(move || index.delete_documents(&ids)).await??;
|
||||
|
||||
Ok(TaskResult::DocumentDeletion { deleted_documents })
|
||||
}
|
||||
TaskContent::DocumentDeletion {
|
||||
deletion: DocumentDeletion::Clear,
|
||||
index_uid,
|
||||
} => {
|
||||
let index = self.get_index(index_uid.clone().into_inner()).await?;
|
||||
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
|
||||
let number_documents = index.stats()?.number_of_documents;
|
||||
index.clear_documents()?;
|
||||
Ok(number_documents)
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(TaskResult::ClearAll { deleted_documents })
|
||||
}
|
||||
TaskContent::SettingsUpdate {
|
||||
settings,
|
||||
is_deletion,
|
||||
allow_index_creation,
|
||||
index_uid,
|
||||
} => {
|
||||
let index = if *is_deletion || !*allow_index_creation {
|
||||
self.get_index(index_uid.clone().into_inner()).await?
|
||||
} else {
|
||||
self.get_or_create_index(index_uid.clone(), task.id).await?
|
||||
};
|
||||
|
||||
let settings = settings.clone();
|
||||
spawn_blocking(move || index.update_settings(&settings.check())).await??;
|
||||
|
||||
Ok(TaskResult::Other)
|
||||
}
|
||||
TaskContent::IndexDeletion { index_uid } => {
|
||||
let index = self.delete_index(index_uid.clone().into_inner()).await?;
|
||||
|
||||
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
|
||||
Ok(index.stats()?.number_of_documents)
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(TaskResult::ClearAll { deleted_documents })
|
||||
}
|
||||
TaskContent::IndexCreation {
|
||||
primary_key,
|
||||
index_uid,
|
||||
} => {
|
||||
let index = self.create_index(index_uid.clone(), task.id).await?;
|
||||
|
||||
if let Some(primary_key) = primary_key {
|
||||
let primary_key = primary_key.clone();
|
||||
spawn_blocking(move || index.update_primary_key(primary_key)).await??;
|
||||
}
|
||||
|
||||
Ok(TaskResult::Other)
|
||||
}
|
||||
TaskContent::IndexUpdate {
|
||||
primary_key,
|
||||
index_uid,
|
||||
} => {
|
||||
let index = self.get_index(index_uid.clone().into_inner()).await?;
|
||||
|
||||
if let Some(primary_key) = primary_key {
|
||||
let primary_key = primary_key.clone();
|
||||
spawn_blocking(move || index.update_primary_key(primary_key)).await??;
|
||||
}
|
||||
|
||||
Ok(TaskResult::Other)
|
||||
}
|
||||
_ => unreachable!("Invalid task for index resolver"),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||
for (_, index) in self.list().await? {
|
||||
index.dump(&path)?;
|
||||
}
|
||||
self.index_uuid_store.dump(path.as_ref().to_owned()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result<Index> {
|
||||
match self.index_uuid_store.get(uid.into_inner()).await? {
|
||||
(uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)),
|
||||
(uid, None) => {
|
||||
let uuid = Uuid::new_v4();
|
||||
let index = self.index_store.create(uuid).await?;
|
||||
match self
|
||||
.index_uuid_store
|
||||
.insert(
|
||||
uid,
|
||||
IndexMeta {
|
||||
uuid,
|
||||
creation_task_id,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
match self.index_store.delete(uuid).await {
|
||||
Ok(Some(index)) => {
|
||||
index.close();
|
||||
}
|
||||
Ok(None) => (),
|
||||
Err(e) => log::error!("Error while deleting index: {:?}", e),
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
Ok(()) => Ok(index),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(indexes)
|
||||
}
|
||||
|
||||
pub async fn delete_index(&self, uid: String) -> Result<Index> {
|
||||
match self.index_uuid_store.delete(uid.clone()).await? {
|
||||
Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? {
|
||||
Some(index) => {
|
||||
index.clone().close();
|
||||
Ok(index)
|
||||
}
|
||||
None => Err(IndexResolverError::UnexistingIndex(uid)),
|
||||
},
|
||||
None => Err(IndexResolverError::UnexistingIndex(uid)),
|
||||
/// Get or create an index with name `uid`.
|
||||
pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result<Index> {
|
||||
match self.create_index(uid, task_id).await {
|
||||
Ok(index) => Ok(index),
|
||||
Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await,
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_index(&self, uid: String) -> Result<Index> {
|
||||
match self.index_uuid_store.get(uid).await? {
|
||||
(name, Some(IndexMeta { uuid, .. })) => {
|
||||
pub async fn list(&self) -> Result<Vec<(String, Index)>> {
|
||||
let uuids = self.index_uuid_store.list().await?;
|
||||
let mut indexes = Vec::new();
|
||||
for (name, IndexMeta { uuid, .. }) in uuids {
|
||||
match self.index_store.get(uuid).await? {
|
||||
Some(index) => Ok(index),
|
||||
Some(index) => indexes.push((name, index)),
|
||||
None => {
|
||||
// For some reason we got a uuid to an unexisting index, we return an error,
|
||||
// and remove the uuid from the uuid store.
|
||||
let _ = self.index_uuid_store.delete(name.clone()).await;
|
||||
Err(IndexResolverError::UnexistingIndex(name))
|
||||
// we found an unexisting index, we remove it from the uuid store
|
||||
let _ = self.index_uuid_store.delete(name).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
(name, _) => Err(IndexResolverError::UnexistingIndex(name)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result<TaskId> {
|
||||
let (uid, meta) = self.index_uuid_store.get(index_uid).await?;
|
||||
meta.map(
|
||||
|IndexMeta {
|
||||
creation_task_id, ..
|
||||
}| creation_task_id,
|
||||
)
|
||||
.ok_or(IndexResolverError::UnexistingIndex(uid))
|
||||
Ok(indexes)
|
||||
}
|
||||
|
||||
pub async fn delete_index(&self, uid: String) -> Result<Index> {
|
||||
match self.index_uuid_store.delete(uid.clone()).await? {
|
||||
Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? {
|
||||
Some(index) => {
|
||||
index.clone().close();
|
||||
Ok(index)
|
||||
}
|
||||
None => Err(IndexResolverError::UnexistingIndex(uid)),
|
||||
},
|
||||
None => Err(IndexResolverError::UnexistingIndex(uid)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_index(&self, uid: String) -> Result<Index> {
|
||||
match self.index_uuid_store.get(uid).await? {
|
||||
(name, Some(IndexMeta { uuid, .. })) => {
|
||||
match self.index_store.get(uuid).await? {
|
||||
Some(index) => Ok(index),
|
||||
None => {
|
||||
// For some reason we got a uuid to an unexisting index, we return an error,
|
||||
// and remove the uuid from the uuid store.
|
||||
let _ = self.index_uuid_store.delete(name.clone()).await;
|
||||
Err(IndexResolverError::UnexistingIndex(name))
|
||||
}
|
||||
}
|
||||
}
|
||||
(name, _) => Err(IndexResolverError::UnexistingIndex(name)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result<TaskId> {
|
||||
let (uid, meta) = self.index_uuid_store.get(index_uid).await?;
|
||||
meta.map(
|
||||
|IndexMeta {
|
||||
creation_task_id, ..
|
||||
}| creation_task_id,
|
||||
)
|
||||
.ok_or(IndexResolverError::UnexistingIndex(uid))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
// use std::{collections::BTreeMap, vec::IntoIter};
|
||||
//
|
||||
// use super::*;
|
||||
//
|
||||
// use futures::future::ok;
|
||||
// use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
|
||||
// use nelson::Mocker;
|
||||
// use proptest::prelude::*;
|
||||
//
|
||||
// use crate::{
|
||||
// index::{
|
||||
// error::{IndexError, Result as IndexResult},
|
||||
// Checked, IndexMeta, IndexStats, Settings,
|
||||
// },
|
||||
// tasks::{batch::Batch, BatchHandler},
|
||||
// };
|
||||
// use index_store::MockIndexStore;
|
||||
// use meta_store::MockIndexMetaStore;
|
||||
use super::*;
|
||||
|
||||
use nelson::Mocker;
|
||||
|
||||
pub enum MockIndexResolver<U, I> {
|
||||
Real(super::real::IndexResolver<U, I>),
|
||||
Mock(Mocker),
|
||||
}
|
||||
|
||||
impl MockIndexResolver<HeedMetaStore, MapIndexStore> {
|
||||
pub fn load_dump(
|
||||
src: impl AsRef<Path>,
|
||||
dst: impl AsRef<Path>,
|
||||
index_db_size: usize,
|
||||
env: Arc<Env>,
|
||||
indexer_opts: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
super::real::IndexResolver::load_dump(src, dst, index_db_size, env, indexer_opts)
|
||||
}
|
||||
}
|
||||
|
||||
impl<U, I> MockIndexResolver<U, I>
|
||||
where
|
||||
U: IndexMetaStore,
|
||||
I: IndexStore,
|
||||
{
|
||||
pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self {
|
||||
Self::Real(super::real::IndexResolver {
|
||||
index_uuid_store,
|
||||
index_store,
|
||||
file_store,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn mock(mocker: Mocker) -> Self {
|
||||
Self::Mock(mocker)
|
||||
}
|
||||
|
||||
pub async fn process_document_addition_batch(&self, tasks: Vec<Task>) -> Vec<Task> {
|
||||
match self {
|
||||
IndexResolver::Real(r) => r.process_document_addition_batch(tasks).await,
|
||||
IndexResolver::Mock(m) => unsafe {
|
||||
m.get("process_document_addition_batch").call(tasks)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
|
||||
match self {
|
||||
IndexResolver::Real(r) => r.process_task(task).await,
|
||||
IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) },
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||
match self {
|
||||
IndexResolver::Real(r) => r.dump(path).await,
|
||||
IndexResolver::Mock(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create an index with name `uid`.
|
||||
pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result<Index> {
|
||||
match self {
|
||||
IndexResolver::Real(r) => r.get_or_create_index(uid, task_id).await,
|
||||
IndexResolver::Mock(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<Vec<(String, Index)>> {
|
||||
match self {
|
||||
IndexResolver::Real(r) => r.list().await,
|
||||
IndexResolver::Mock(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_index(&self, uid: String) -> Result<Index> {
|
||||
match self {
|
||||
IndexResolver::Real(r) => r.delete_index(uid).await,
|
||||
IndexResolver::Mock(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_index(&self, uid: String) -> Result<Index> {
|
||||
match self {
|
||||
IndexResolver::Real(r) => r.get_index(uid).await,
|
||||
IndexResolver::Mock(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result<TaskId> {
|
||||
match self {
|
||||
IndexResolver::Real(r) => r.get_index_creation_task_id(index_uid).await,
|
||||
IndexResolver::Mock(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> {
|
||||
match self {
|
||||
IndexResolver::Real(r) => r.delete_content_file(content_uuid).await,
|
||||
IndexResolver::Mock(m) => unsafe {
|
||||
m.get("delete_content_file").call(content_uuid)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: ignoring this test, it has become too complex to maintain, and rather implement
|
||||
// handler logic test.
|
||||
|
@ -38,7 +38,7 @@ where
|
||||
if let BatchContent::DocumentsAdditionBatch(ref tasks) = batch.content {
|
||||
for task in tasks {
|
||||
if let Some(content_uuid) = task.get_content_uuid() {
|
||||
if let Err(e) = self.file_store.delete(content_uuid).await {
|
||||
if let Err(e) = self.delete_content_file(content_uuid).await {
|
||||
log::error!("error deleting update file: {}", e);
|
||||
}
|
||||
}
|
||||
@ -49,7 +49,12 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::index_resolver::{index_store::MockIndexStore, meta_store::MockIndexMetaStore};
|
||||
use crate::index_resolver::index_store::MapIndexStore;
|
||||
use crate::index_resolver::meta_store::HeedMetaStore;
|
||||
use crate::index_resolver::{
|
||||
error::Result as IndexResult, index_store::MockIndexStore, meta_store::MockIndexMetaStore,
|
||||
};
|
||||
use crate::tasks::task::TaskResult;
|
||||
use crate::tasks::{
|
||||
handlers::test::task_to_batch,
|
||||
task::{Task, TaskContent},
|
||||
@ -142,5 +147,58 @@ mod test {
|
||||
index_resolver.process_batch(batch).await;
|
||||
}
|
||||
|
||||
// TODO: test perform_batch. We need a Mocker for IndexResolver.
|
||||
proptest! {
|
||||
#[test]
|
||||
fn index_document_task_deletes_update_file(
|
||||
task in any::<Task>(),
|
||||
) {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
let handle = rt.spawn(async {
|
||||
let mocker = Mocker::default();
|
||||
|
||||
if let TaskContent::DocumentAddition{ .. } = task.content {
|
||||
mocker.when::<Uuid, IndexResult<()>>("delete_content_file").then(|_| Ok(()));
|
||||
}
|
||||
|
||||
let index_resolver: IndexResolver<HeedMetaStore, MapIndexStore> = IndexResolver::mock(mocker);
|
||||
|
||||
let batch = task_to_batch(task);
|
||||
|
||||
index_resolver.finish(&batch).await;
|
||||
});
|
||||
|
||||
rt.block_on(handle).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_handle_batch(task in any::<Task>()) {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
let handle = rt.spawn(async {
|
||||
let mocker = Mocker::default();
|
||||
match task.content {
|
||||
TaskContent::DocumentAddition { .. } => {
|
||||
mocker.when::<Vec<Task>, Vec<Task>>("process_document_addition_batch").then(|tasks| tasks);
|
||||
}
|
||||
TaskContent::Dump { .. } => (),
|
||||
_ => {
|
||||
mocker.when::<&Task, IndexResult<TaskResult>>("process_task").then(|_| Ok(TaskResult::Other));
|
||||
}
|
||||
}
|
||||
let index_resolver: IndexResolver<HeedMetaStore, MapIndexStore> = IndexResolver::mock(mocker);
|
||||
|
||||
|
||||
let batch = task_to_batch(task);
|
||||
|
||||
if index_resolver.accept(&batch) {
|
||||
index_resolver.process_batch(batch).await;
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = rt.block_on(handle) {
|
||||
if e.is_panic() {
|
||||
std::panic::resume_unwind(e.into_panic());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user