MeiliSearch/src/index_controller/local_index_controller/mod.rs

146 lines
4.9 KiB
Rust
Raw Normal View History

2021-01-28 14:12:34 +01:00
mod update_store;
mod index_store;
mod update_handler;
use std::path::Path;
use std::sync::Arc;
2021-01-28 17:20:51 +01:00
use anyhow::bail;
2021-01-28 18:32:24 +01:00
use itertools::Itertools;
2021-02-01 19:51:47 +01:00
use milli::Index;
2021-01-28 14:12:34 +01:00
use crate::option::IndexerOpts;
2021-02-01 19:51:47 +01:00
use index_store::IndexStore;
2021-01-28 14:12:34 +01:00
use super::IndexController;
2021-01-28 15:14:48 +01:00
use super::updates::UpdateStatus;
2021-02-03 17:44:20 +01:00
use super::{UpdateMeta, UpdateResult, IndexMetadata};
2021-01-28 14:12:34 +01:00
pub struct LocalIndexController {
indexes: IndexStore,
2021-01-28 15:14:48 +01:00
update_db_size: u64,
index_db_size: u64,
2021-01-28 14:12:34 +01:00
}
impl LocalIndexController {
2021-01-28 15:14:48 +01:00
pub fn new(
path: impl AsRef<Path>,
opt: IndexerOpts,
index_db_size: u64,
update_db_size: u64,
) -> anyhow::Result<Self> {
2021-01-28 14:12:34 +01:00
let indexes = IndexStore::new(path, opt)?;
2021-01-28 15:14:48 +01:00
Ok(Self { indexes, index_db_size, update_db_size })
2021-01-28 14:12:34 +01:00
}
}
impl IndexController for LocalIndexController {
fn add_documents<S: AsRef<str>>(
&self,
2021-01-28 15:14:48 +01:00
index: S,
method: milli::update::IndexDocumentsMethod,
format: milli::update::UpdateFormat,
data: &[u8],
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
let meta = UpdateMeta::DocumentsAddition { method, format };
let pending = update_store.register_update(meta, data).unwrap();
Ok(pending.into())
2021-01-28 14:12:34 +01:00
}
2021-01-28 15:14:48 +01:00
fn update_settings<S: AsRef<str>>(
&self,
2021-01-28 16:57:53 +01:00
index: S,
settings: super::Settings
2021-01-28 15:14:48 +01:00
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
2021-01-28 16:57:53 +01:00
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
let meta = UpdateMeta::Settings(settings);
let pending = update_store.register_update(meta, &[]).unwrap();
Ok(pending.into())
2021-01-28 14:12:34 +01:00
}
2021-02-03 20:12:48 +01:00
fn create_index<S: AsRef<str>>(&self, index_uid: S) -> anyhow::Result<()> {
self.indexes.create_index(index_uid, self.update_db_size, self.index_db_size)?;
Ok(())
2021-01-28 14:12:34 +01:00
}
fn delete_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> {
todo!()
}
fn swap_indices<S1: AsRef<str>, S2: AsRef<str>>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> {
todo!()
}
fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>> {
let index = self.indexes.index(name)?.map(|(i, _)| i);
Ok(index)
}
2021-01-28 17:20:51 +01:00
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
match self.indexes.index(&index)? {
Some((_, update_store)) => Ok(update_store.meta(id)?),
None => bail!("index {:?} doesn't exist", index.as_ref()),
}
}
2021-01-28 18:32:24 +01:00
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
2021-02-01 19:51:47 +01:00
match self.indexes.index(&index)? {
2021-01-28 18:32:24 +01:00
Some((_, update_store)) => {
let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| {
Ok(processing
.map(UpdateStatus::from)
.into_iter()
.chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u)))
.chain(aborted.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
.chain(processed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
.chain(failed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
.sorted_by(|a, b| a.id().cmp(&b.id()))
.collect())
})?;
Ok(updates)
}
2021-02-01 19:51:47 +01:00
None => bail!("index {} doesn't exist.", index.as_ref()),
2021-01-28 18:32:24 +01:00
}
}
2021-02-03 17:44:20 +01:00
fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
let metas = self.indexes.list_indexes()?;
let mut output_meta = Vec::new();
for (name, meta) in metas {
let created_at = meta.created_at;
let uuid = meta.uuid;
let updated_at = self
.all_update_status(&name)?
.iter()
.filter_map(|u| u.processed().map(|u| u.processed_at))
.max()
.unwrap_or(created_at);
let index_meta = IndexMetadata {
name,
created_at,
updated_at,
uuid,
primary_key: None,
};
output_meta.push(index_meta);
}
Ok(output_meta)
}
2021-01-28 14:12:34 +01:00
}
#[cfg(test)]
mod test {
use super::*;
use tempfile::tempdir;
use crate::make_index_controller_tests;
make_index_controller_tests!({
let options = IndexerOpts::default();
let path = tempdir().unwrap();
let size = 4096 * 100;
LocalIndexController::new(path, options, size, size).unwrap()
});
}