move IndexResolver to real module

This commit is contained in:
ad hoc 2022-06-01 09:53:07 +02:00
parent c3003065e8
commit bbd685af5e
No known key found for this signature in database
GPG Key ID: 4F00A782990CC643

View File

@ -27,6 +27,8 @@ use self::meta_store::IndexMeta;
pub type HardStateIndexResolver = IndexResolver<HeedMetaStore, MapIndexStore>; pub type HardStateIndexResolver = IndexResolver<HeedMetaStore, MapIndexStore>;
pub use real::IndexResolver;
/// An index uid is composed of only ascii alphanumeric characters, - and _, between 1 and 400 /// An index uid is composed of only ascii alphanumeric characters, - and _, between 1 and 400
/// bytes long /// bytes long
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
@ -96,326 +98,332 @@ impl FromStr for IndexUid {
} }
} }
pub struct IndexResolver<U, I> { mod real {
index_uuid_store: U, use super::*;
index_store: I,
pub file_store: UpdateFileStore,
}
impl IndexResolver<HeedMetaStore, MapIndexStore> { pub struct IndexResolver<U, I> {
pub fn load_dump( index_uuid_store: U,
src: impl AsRef<Path>, index_store: I,
dst: impl AsRef<Path>, pub file_store: UpdateFileStore,
index_db_size: usize,
env: Arc<Env>,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
HeedMetaStore::load_dump(&src, env)?;
let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
let indexer_config = IndexerConfig::try_from(indexer_opts)?;
for index in indexes {
Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?;
}
Ok(())
} }
}
impl<U, I> IndexResolver<U, I> impl IndexResolver<HeedMetaStore, MapIndexStore> {
where pub fn load_dump(
U: IndexMetaStore, src: impl AsRef<Path>,
I: IndexStore, dst: impl AsRef<Path>,
{ index_db_size: usize,
pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self { env: Arc<Env>,
Self { indexer_opts: &IndexerOpts,
index_uuid_store, ) -> anyhow::Result<()> {
index_store, HeedMetaStore::load_dump(&src, env)?;
file_store, let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
let indexer_config = IndexerConfig::try_from(indexer_opts)?;
for index in indexes {
Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?;
}
Ok(())
} }
} }
pub async fn process_document_addition_batch(&self, mut tasks: Vec<Task>) -> Vec<Task> { impl<U, I> IndexResolver<U, I>
fn get_content_uuid(task: &Task) -> Uuid { where
match task { U: IndexMetaStore,
Task { I: IndexStore,
content: TaskContent::DocumentAddition { content_uuid, .. }, {
.. pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self {
} => *content_uuid, Self {
_ => panic!("unexpected task in the document addition batch"), index_uuid_store,
index_store,
file_store,
} }
} }
let content_uuids = tasks.iter().map(get_content_uuid).collect::<Vec<_>>(); pub async fn process_document_addition_batch(&self, mut tasks: Vec<Task>) -> Vec<Task> {
fn get_content_uuid(task: &Task) -> Uuid {
match tasks.first() { match task {
Some(Task { Task {
id, content: TaskContent::DocumentAddition { content_uuid, .. },
content:
TaskContent::DocumentAddition {
merge_strategy,
primary_key,
allow_index_creation,
index_uid,
.. ..
}, } => *content_uuid,
.. _ => panic!("unexpected task in the document addition batch"),
}) => { }
let primary_key = primary_key.clone(); }
let method = *merge_strategy;
let index = if *allow_index_creation { let content_uuids = tasks.iter().map(get_content_uuid).collect::<Vec<_>>();
self.get_or_create_index(index_uid.clone(), *id).await
} else {
self.get_index(index_uid.as_str().to_string()).await
};
// If the index doesn't exist and we are not allowed to create it with the first match tasks.first() {
// task, we must fails the whole batch. Some(Task {
let now = OffsetDateTime::now_utc(); id,
let index = match index { content:
Ok(index) => index, TaskContent::DocumentAddition {
Err(e) => { merge_strategy,
let error = ResponseError::from(e); primary_key,
for task in tasks.iter_mut() { allow_index_creation,
task.events.push(TaskEvent::Failed { index_uid,
error: error.clone(), ..
timestamp: now,
});
}
return tasks;
}
};
let file_store = self.file_store.clone();
let result = spawn_blocking(move || {
index.update_documents(
method,
primary_key,
file_store,
content_uuids.into_iter(),
)
})
.await;
let event = match result {
Ok(Ok(result)) => TaskEvent::Succeeded {
timestamp: OffsetDateTime::now_utc(),
result: TaskResult::DocumentAddition {
indexed_documents: result.indexed_documents,
}, },
}, ..
Ok(Err(e)) => TaskEvent::Failed { }) => {
timestamp: OffsetDateTime::now_utc(),
error: e.into(),
},
Err(e) => TaskEvent::Failed {
timestamp: OffsetDateTime::now_utc(),
error: IndexResolverError::from(e).into(),
},
};
for task in tasks.iter_mut() {
task.events.push(event.clone());
}
tasks
}
_ => panic!("invalid batch!"),
}
}
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
match &task.content {
TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"),
TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Ids(ids),
index_uid,
} => {
let ids = ids.clone();
let index = self.get_index(index_uid.clone().into_inner()).await?;
let DocumentDeletionResult {
deleted_documents, ..
} = spawn_blocking(move || index.delete_documents(&ids)).await??;
Ok(TaskResult::DocumentDeletion { deleted_documents })
}
TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Clear,
index_uid,
} => {
let index = self.get_index(index_uid.clone().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
let number_documents = index.stats()?.number_of_documents;
index.clear_documents()?;
Ok(number_documents)
})
.await??;
Ok(TaskResult::ClearAll { deleted_documents })
}
TaskContent::SettingsUpdate {
settings,
is_deletion,
allow_index_creation,
index_uid,
} => {
let index = if *is_deletion || !*allow_index_creation {
self.get_index(index_uid.clone().into_inner()).await?
} else {
self.get_or_create_index(index_uid.clone(), task.id).await?
};
let settings = settings.clone();
spawn_blocking(move || index.update_settings(&settings.check())).await??;
Ok(TaskResult::Other)
}
TaskContent::IndexDeletion { index_uid } => {
let index = self.delete_index(index_uid.clone().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
Ok(index.stats()?.number_of_documents)
})
.await??;
Ok(TaskResult::ClearAll { deleted_documents })
}
TaskContent::IndexCreation {
primary_key,
index_uid,
} => {
let index = self.create_index(index_uid.clone(), task.id).await?;
if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone(); let primary_key = primary_key.clone();
spawn_blocking(move || index.update_primary_key(primary_key)).await??; let method = *merge_strategy;
}
Ok(TaskResult::Other) let index = if *allow_index_creation {
} self.get_or_create_index(index_uid.clone(), *id).await
TaskContent::IndexUpdate { } else {
primary_key, self.get_index(index_uid.as_str().to_string()).await
index_uid, };
} => {
let index = self.get_index(index_uid.clone().into_inner()).await?;
if let Some(primary_key) = primary_key { // If the index doesn't exist and we are not allowed to create it with the first
let primary_key = primary_key.clone(); // task, we must fails the whole batch.
spawn_blocking(move || index.update_primary_key(primary_key)).await??; let now = OffsetDateTime::now_utc();
} let index = match index {
Ok(index) => index,
Ok(TaskResult::Other) Err(e) => {
} let error = ResponseError::from(e);
_ => unreachable!("Invalid task for index resolver"), for task in tasks.iter_mut() {
} task.events.push(TaskEvent::Failed {
} error: error.clone(),
timestamp: now,
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> { });
for (_, index) in self.list().await? {
index.dump(&path)?;
}
self.index_uuid_store.dump(path.as_ref().to_owned()).await?;
Ok(())
}
async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result<Index> {
match self.index_uuid_store.get(uid.into_inner()).await? {
(uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)),
(uid, None) => {
let uuid = Uuid::new_v4();
let index = self.index_store.create(uuid).await?;
match self
.index_uuid_store
.insert(
uid,
IndexMeta {
uuid,
creation_task_id,
},
)
.await
{
Err(e) => {
match self.index_store.delete(uuid).await {
Ok(Some(index)) => {
index.close();
} }
Ok(None) => (), return tasks;
Err(e) => log::error!("Error while deleting index: {:?}", e),
} }
Err(e) };
let file_store = self.file_store.clone();
let result = spawn_blocking(move || {
index.update_documents(
method,
primary_key,
file_store,
content_uuids.into_iter(),
)
})
.await;
let event = match result {
Ok(Ok(result)) => TaskEvent::Succeeded {
timestamp: OffsetDateTime::now_utc(),
result: TaskResult::DocumentAddition {
indexed_documents: result.indexed_documents,
},
},
Ok(Err(e)) => TaskEvent::Failed {
timestamp: OffsetDateTime::now_utc(),
error: e.into(),
},
Err(e) => TaskEvent::Failed {
timestamp: OffsetDateTime::now_utc(),
error: IndexResolverError::from(e).into(),
},
};
for task in tasks.iter_mut() {
task.events.push(event.clone());
} }
Ok(()) => Ok(index),
tasks
} }
_ => panic!("invalid batch!"),
} }
} }
}
/// Get or create an index with name `uid`. pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result<Index> { match &task.content {
match self.create_index(uid, task_id).await { TaskContent::DocumentAddition { .. } => {
Ok(index) => Ok(index), panic!("updates should be handled by batch")
Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await, }
Err(e) => Err(e), TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Ids(ids),
index_uid,
} => {
let ids = ids.clone();
let index = self.get_index(index_uid.clone().into_inner()).await?;
let DocumentDeletionResult {
deleted_documents, ..
} = spawn_blocking(move || index.delete_documents(&ids)).await??;
Ok(TaskResult::DocumentDeletion { deleted_documents })
}
TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Clear,
index_uid,
} => {
let index = self.get_index(index_uid.clone().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
let number_documents = index.stats()?.number_of_documents;
index.clear_documents()?;
Ok(number_documents)
})
.await??;
Ok(TaskResult::ClearAll { deleted_documents })
}
TaskContent::SettingsUpdate {
settings,
is_deletion,
allow_index_creation,
index_uid,
} => {
let index = if *is_deletion || !*allow_index_creation {
self.get_index(index_uid.clone().into_inner()).await?
} else {
self.get_or_create_index(index_uid.clone(), task.id).await?
};
let settings = settings.clone();
spawn_blocking(move || index.update_settings(&settings.check())).await??;
Ok(TaskResult::Other)
}
TaskContent::IndexDeletion { index_uid } => {
let index = self.delete_index(index_uid.clone().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
Ok(index.stats()?.number_of_documents)
})
.await??;
Ok(TaskResult::ClearAll { deleted_documents })
}
TaskContent::IndexCreation {
primary_key,
index_uid,
} => {
let index = self.create_index(index_uid.clone(), task.id).await?;
if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone();
spawn_blocking(move || index.update_primary_key(primary_key)).await??;
}
Ok(TaskResult::Other)
}
TaskContent::IndexUpdate {
primary_key,
index_uid,
} => {
let index = self.get_index(index_uid.clone().into_inner()).await?;
if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone();
spawn_blocking(move || index.update_primary_key(primary_key)).await??;
}
Ok(TaskResult::Other)
}
_ => unreachable!("Invalid task for index resolver"),
}
} }
}
pub async fn list(&self) -> Result<Vec<(String, Index)>> { pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
let uuids = self.index_uuid_store.list().await?; for (_, index) in self.list().await? {
let mut indexes = Vec::new(); index.dump(&path)?;
for (name, IndexMeta { uuid, .. }) in uuids { }
match self.index_store.get(uuid).await? { self.index_uuid_store.dump(path.as_ref().to_owned()).await?;
Some(index) => indexes.push((name, index)), Ok(())
None => { }
// we found an unexisting index, we remove it from the uuid store
let _ = self.index_uuid_store.delete(name).await; async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result<Index> {
match self.index_uuid_store.get(uid.into_inner()).await? {
(uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)),
(uid, None) => {
let uuid = Uuid::new_v4();
let index = self.index_store.create(uuid).await?;
match self
.index_uuid_store
.insert(
uid,
IndexMeta {
uuid,
creation_task_id,
},
)
.await
{
Err(e) => {
match self.index_store.delete(uuid).await {
Ok(Some(index)) => {
index.close();
}
Ok(None) => (),
Err(e) => log::error!("Error while deleting index: {:?}", e),
}
Err(e)
}
Ok(()) => Ok(index),
}
} }
} }
} }
Ok(indexes) /// Get or create an index with name `uid`.
} pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result<Index> {
match self.create_index(uid, task_id).await {
pub async fn delete_index(&self, uid: String) -> Result<Index> { Ok(index) => Ok(index),
match self.index_uuid_store.delete(uid.clone()).await? { Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await,
Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? { Err(e) => Err(e),
Some(index) => { }
index.clone().close();
Ok(index)
}
None => Err(IndexResolverError::UnexistingIndex(uid)),
},
None => Err(IndexResolverError::UnexistingIndex(uid)),
} }
}
pub async fn get_index(&self, uid: String) -> Result<Index> { pub async fn list(&self) -> Result<Vec<(String, Index)>> {
match self.index_uuid_store.get(uid).await? { let uuids = self.index_uuid_store.list().await?;
(name, Some(IndexMeta { uuid, .. })) => { let mut indexes = Vec::new();
for (name, IndexMeta { uuid, .. }) in uuids {
match self.index_store.get(uuid).await? { match self.index_store.get(uuid).await? {
Some(index) => Ok(index), Some(index) => indexes.push((name, index)),
None => { None => {
// For some reason we got a uuid to an unexisting index, we return an error, // we found an unexisting index, we remove it from the uuid store
// and remove the uuid from the uuid store. let _ = self.index_uuid_store.delete(name).await;
let _ = self.index_uuid_store.delete(name.clone()).await;
Err(IndexResolverError::UnexistingIndex(name))
} }
} }
} }
(name, _) => Err(IndexResolverError::UnexistingIndex(name)),
}
}
pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result<TaskId> { Ok(indexes)
let (uid, meta) = self.index_uuid_store.get(index_uid).await?; }
meta.map(
|IndexMeta { pub async fn delete_index(&self, uid: String) -> Result<Index> {
creation_task_id, .. match self.index_uuid_store.delete(uid.clone()).await? {
}| creation_task_id, Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? {
) Some(index) => {
.ok_or(IndexResolverError::UnexistingIndex(uid)) index.clone().close();
Ok(index)
}
None => Err(IndexResolverError::UnexistingIndex(uid)),
},
None => Err(IndexResolverError::UnexistingIndex(uid)),
}
}
pub async fn get_index(&self, uid: String) -> Result<Index> {
match self.index_uuid_store.get(uid).await? {
(name, Some(IndexMeta { uuid, .. })) => {
match self.index_store.get(uuid).await? {
Some(index) => Ok(index),
None => {
// For some reason we got a uuid to an unexisting index, we return an error,
// and remove the uuid from the uuid store.
let _ = self.index_uuid_store.delete(name.clone()).await;
Err(IndexResolverError::UnexistingIndex(name))
}
}
}
(name, _) => Err(IndexResolverError::UnexistingIndex(name)),
}
}
pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result<TaskId> {
let (uid, meta) = self.index_uuid_store.get(index_uid).await?;
meta.map(
|IndexMeta {
creation_task_id, ..
}| creation_task_id,
)
.ok_or(IndexResolverError::UnexistingIndex(uid))
}
} }
} }