2021-01-28 14:12:34 +01:00
|
|
|
mod update_store;
|
|
|
|
mod index_store;
|
|
|
|
mod update_handler;
|
|
|
|
|
|
|
|
use std::path::Path;
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
2021-02-08 10:47:34 +01:00
|
|
|
use anyhow::{bail, Context};
|
2021-01-28 18:32:24 +01:00
|
|
|
use itertools::Itertools;
|
2021-02-01 19:51:47 +01:00
|
|
|
use milli::Index;
|
2021-01-28 14:12:34 +01:00
|
|
|
|
|
|
|
use crate::option::IndexerOpts;
|
2021-02-01 19:51:47 +01:00
|
|
|
use index_store::IndexStore;
|
2021-01-28 14:12:34 +01:00
|
|
|
use super::IndexController;
|
2021-01-28 15:14:48 +01:00
|
|
|
use super::updates::UpdateStatus;
|
2021-02-03 17:44:20 +01:00
|
|
|
use super::{UpdateMeta, UpdateResult, IndexMetadata};
|
2021-01-28 14:12:34 +01:00
|
|
|
|
|
|
|
pub struct LocalIndexController {
|
|
|
|
indexes: IndexStore,
|
2021-01-28 15:14:48 +01:00
|
|
|
update_db_size: u64,
|
|
|
|
index_db_size: u64,
|
2021-01-28 14:12:34 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
impl LocalIndexController {
|
2021-01-28 15:14:48 +01:00
|
|
|
pub fn new(
|
|
|
|
path: impl AsRef<Path>,
|
|
|
|
opt: IndexerOpts,
|
|
|
|
index_db_size: u64,
|
|
|
|
update_db_size: u64,
|
|
|
|
) -> anyhow::Result<Self> {
|
2021-01-28 14:12:34 +01:00
|
|
|
let indexes = IndexStore::new(path, opt)?;
|
2021-01-28 15:14:48 +01:00
|
|
|
Ok(Self { indexes, index_db_size, update_db_size })
|
2021-01-28 14:12:34 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl IndexController for LocalIndexController {
|
|
|
|
fn add_documents<S: AsRef<str>>(
|
|
|
|
&self,
|
2021-01-28 15:14:48 +01:00
|
|
|
index: S,
|
|
|
|
method: milli::update::IndexDocumentsMethod,
|
|
|
|
format: milli::update::UpdateFormat,
|
|
|
|
data: &[u8],
|
|
|
|
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
|
|
|
|
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
|
|
|
|
let meta = UpdateMeta::DocumentsAddition { method, format };
|
|
|
|
let pending = update_store.register_update(meta, data).unwrap();
|
|
|
|
Ok(pending.into())
|
2021-01-28 14:12:34 +01:00
|
|
|
}
|
|
|
|
|
2021-01-28 15:14:48 +01:00
|
|
|
fn update_settings<S: AsRef<str>>(
|
|
|
|
&self,
|
2021-01-28 16:57:53 +01:00
|
|
|
index: S,
|
|
|
|
settings: super::Settings
|
2021-01-28 15:14:48 +01:00
|
|
|
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
|
2021-01-28 16:57:53 +01:00
|
|
|
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
|
|
|
|
let meta = UpdateMeta::Settings(settings);
|
|
|
|
let pending = update_store.register_update(meta, &[]).unwrap();
|
|
|
|
Ok(pending.into())
|
2021-01-28 14:12:34 +01:00
|
|
|
}
|
|
|
|
|
2021-02-08 10:47:34 +01:00
|
|
|
fn create_index(&self, index_name: impl AsRef<str>, primary_key: Option<impl AsRef<str>>) -> anyhow::Result<IndexMetadata> {
|
|
|
|
let (index, _, meta) = self.indexes.create_index(&index_name, self.update_db_size, self.index_db_size)?;
|
|
|
|
if let Some(ref primary_key) = primary_key {
|
|
|
|
if let Err(e) = update_primary_key(index, primary_key).context("error creating index") {
|
|
|
|
// TODO: creating index could not be completed, delete everything.
|
|
|
|
Err(e)?
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let meta = IndexMetadata {
|
|
|
|
name: index_name.as_ref().to_owned(),
|
|
|
|
uuid: meta.uuid.clone(),
|
|
|
|
created_at: meta.created_at,
|
|
|
|
updated_at: meta.created_at,
|
|
|
|
primary_key: primary_key.map(|n| n.as_ref().to_owned()),
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(meta)
|
2021-01-28 14:12:34 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
fn delete_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> {
|
|
|
|
todo!()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn swap_indices<S1: AsRef<str>, S2: AsRef<str>>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> {
|
|
|
|
todo!()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>> {
|
|
|
|
let index = self.indexes.index(name)?.map(|(i, _)| i);
|
|
|
|
Ok(index)
|
|
|
|
}
|
2021-01-28 17:20:51 +01:00
|
|
|
|
|
|
|
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
|
|
|
|
match self.indexes.index(&index)? {
|
|
|
|
Some((_, update_store)) => Ok(update_store.meta(id)?),
|
|
|
|
None => bail!("index {:?} doesn't exist", index.as_ref()),
|
|
|
|
}
|
|
|
|
}
|
2021-01-28 18:32:24 +01:00
|
|
|
|
|
|
|
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
|
2021-02-01 19:51:47 +01:00
|
|
|
match self.indexes.index(&index)? {
|
2021-01-28 18:32:24 +01:00
|
|
|
Some((_, update_store)) => {
|
|
|
|
let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| {
|
|
|
|
Ok(processing
|
|
|
|
.map(UpdateStatus::from)
|
|
|
|
.into_iter()
|
|
|
|
.chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u)))
|
|
|
|
.chain(aborted.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
|
|
|
.chain(processed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
|
|
|
.chain(failed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
|
|
|
.sorted_by(|a, b| a.id().cmp(&b.id()))
|
|
|
|
.collect())
|
|
|
|
})?;
|
|
|
|
Ok(updates)
|
|
|
|
}
|
2021-02-01 19:51:47 +01:00
|
|
|
None => bail!("index {} doesn't exist.", index.as_ref()),
|
2021-01-28 18:32:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
2021-02-03 17:44:20 +01:00
|
|
|
|
|
|
|
fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
|
|
|
|
let metas = self.indexes.list_indexes()?;
|
|
|
|
let mut output_meta = Vec::new();
|
2021-02-08 10:47:34 +01:00
|
|
|
for (name, meta, primary_key) in metas {
|
2021-02-03 17:44:20 +01:00
|
|
|
let created_at = meta.created_at;
|
|
|
|
let uuid = meta.uuid;
|
|
|
|
let updated_at = self
|
|
|
|
.all_update_status(&name)?
|
|
|
|
.iter()
|
|
|
|
.filter_map(|u| u.processed().map(|u| u.processed_at))
|
|
|
|
.max()
|
|
|
|
.unwrap_or(created_at);
|
|
|
|
|
|
|
|
let index_meta = IndexMetadata {
|
|
|
|
name,
|
|
|
|
created_at,
|
|
|
|
updated_at,
|
|
|
|
uuid,
|
2021-02-08 10:47:34 +01:00
|
|
|
primary_key,
|
2021-02-03 17:44:20 +01:00
|
|
|
};
|
|
|
|
output_meta.push(index_meta);
|
|
|
|
}
|
|
|
|
Ok(output_meta)
|
|
|
|
}
|
2021-01-28 14:12:34 +01:00
|
|
|
}
|
2021-02-03 20:20:51 +01:00
|
|
|
|
2021-02-08 10:47:34 +01:00
|
|
|
fn update_primary_key(index: impl AsRef<Index>, primary_key: impl AsRef<str>) -> anyhow::Result<()> {
|
|
|
|
let index = index.as_ref();
|
|
|
|
let mut txn = index.write_txn()?;
|
|
|
|
if index.primary_key(&txn)?.is_some() {
|
|
|
|
bail!("primary key already set.")
|
|
|
|
}
|
|
|
|
index.put_primary_key(&mut txn, primary_key.as_ref())?;
|
|
|
|
txn.commit()?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-02-03 20:20:51 +01:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
|
|
|
use tempfile::tempdir;
|
|
|
|
use crate::make_index_controller_tests;
|
|
|
|
|
|
|
|
make_index_controller_tests!({
|
|
|
|
let options = IndexerOpts::default();
|
|
|
|
let path = tempdir().unwrap();
|
|
|
|
let size = 4096 * 100;
|
|
|
|
LocalIndexController::new(path, options, size, size).unwrap()
|
|
|
|
});
|
|
|
|
}
|