mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-22 21:04:27 +01:00
architecture rework
This commit is contained in:
parent
6a3f625e11
commit
74410d8c6b
24
Cargo.lock
generated
24
Cargo.lock
generated
@ -1137,6 +1137,17 @@ dependencies = [
|
||||
"wasi 0.9.0+wasi-snapshot-preview1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"libc",
|
||||
"wasi 0.10.0+wasi-snapshot-preview1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.23.0"
|
||||
@ -1608,7 +1619,7 @@ checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
|
||||
|
||||
[[package]]
|
||||
name = "meilisearch-error"
|
||||
version = "0.18.0"
|
||||
version = "0.18.1"
|
||||
dependencies = [
|
||||
"actix-http",
|
||||
]
|
||||
@ -1669,6 +1680,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"ureq",
|
||||
"uuid",
|
||||
"vergen",
|
||||
"walkdir",
|
||||
"whoami",
|
||||
@ -2263,7 +2275,7 @@ version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
"getrandom 0.1.15",
|
||||
"libc",
|
||||
"rand_chacha",
|
||||
"rand_core 0.5.1",
|
||||
@ -2302,7 +2314,7 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
"getrandom 0.1.15",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -3365,11 +3377,11 @@ checksum = "9071ac216321a4470a69fb2b28cfc68dcd1a39acd877c8be8e014df6772d8efa"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "0.8.1"
|
||||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
|
||||
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
|
||||
dependencies = [
|
||||
"rand 0.7.3",
|
||||
"getrandom 0.2.2",
|
||||
"serde",
|
||||
]
|
||||
|
||||
|
@ -62,6 +62,7 @@ dashmap = "4.0.2"
|
||||
page_size = "0.4.2"
|
||||
obkv = "0.1.1"
|
||||
ouroboros = "0.8.0"
|
||||
uuid = "0.8.2"
|
||||
|
||||
[dependencies.sentry]
|
||||
default-features = false
|
||||
|
@ -5,19 +5,20 @@ pub use search::{SearchQuery, SearchResult};
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::fs::create_dir_all;
|
||||
|
||||
use sha2::Digest;
|
||||
|
||||
use crate::{option::Opt, index_controller::Settings};
|
||||
use crate::index_controller::{IndexStore, UpdateStore};
|
||||
use crate::index_controller::{IndexController, LocalIndexController};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Data {
|
||||
inner: Arc<DataInner<UpdateStore>>,
|
||||
inner: Arc<DataInner>,
|
||||
}
|
||||
|
||||
impl Deref for Data {
|
||||
type Target = DataInner<UpdateStore>;
|
||||
type Target = DataInner;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
@ -25,8 +26,8 @@ impl Deref for Data {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DataInner<I> {
|
||||
pub indexes: Arc<I>,
|
||||
pub struct DataInner {
|
||||
pub index_controller: Arc<LocalIndexController>,
|
||||
api_keys: ApiKeys,
|
||||
options: Opt,
|
||||
}
|
||||
@ -58,8 +59,9 @@ impl ApiKeys {
|
||||
impl Data {
|
||||
pub fn new(options: Opt) -> anyhow::Result<Data> {
|
||||
let path = options.db_path.clone();
|
||||
let index_store = IndexStore::new(&path)?;
|
||||
let index_controller = UpdateStore::new(index_store);
|
||||
let indexer_opts = options.indexer_options.clone();
|
||||
create_dir_all(&path)?;
|
||||
let index_controller = LocalIndexController::new(&path, indexer_opts)?;
|
||||
let indexes = Arc::new(index_controller);
|
||||
|
||||
let mut api_keys = ApiKeys {
|
||||
@ -70,28 +72,31 @@ impl Data {
|
||||
|
||||
api_keys.generate_missing_api_keys();
|
||||
|
||||
let inner = DataInner { indexes, options, api_keys };
|
||||
let inner = DataInner { index_controller: indexes, options, api_keys };
|
||||
let inner = Arc::new(inner);
|
||||
|
||||
Ok(Data { inner })
|
||||
}
|
||||
|
||||
pub fn settings<S: AsRef<str>>(&self, index_uid: S) -> anyhow::Result<Settings> {
|
||||
let index = self.indexes
|
||||
.get(&index_uid)?
|
||||
let index = self.index_controller
|
||||
.index(&index_uid)?
|
||||
.ok_or_else(|| anyhow::anyhow!("Index {} does not exist.", index_uid.as_ref()))?;
|
||||
|
||||
let txn = index.read_txn()?;
|
||||
|
||||
let displayed_attributes = index
|
||||
.displayed_fields()?
|
||||
.displayed_fields(&txn)?
|
||||
.map(|fields| fields.into_iter().map(String::from).collect())
|
||||
.unwrap_or_else(|| vec!["*".to_string()]);
|
||||
|
||||
let searchable_attributes = index
|
||||
.searchable_fields()?
|
||||
.searchable_fields(&txn)?
|
||||
.map(|fields| fields.into_iter().map(String::from).collect())
|
||||
.unwrap_or_else(|| vec!["*".to_string()]);
|
||||
|
||||
let faceted_attributes = index.faceted_fields()?
|
||||
let faceted_attributes = index
|
||||
.faceted_fields(&txn)?
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, v.to_string()))
|
||||
.collect();
|
||||
|
@ -4,11 +4,11 @@ use std::time::Instant;
|
||||
|
||||
use serde_json::{Value, Map};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use milli::{SearchResult as Results, obkv_to_json};
|
||||
use milli::{Index, obkv_to_json, FacetCondition};
|
||||
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
|
||||
use anyhow::bail;
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
use crate::index_controller::IndexController;
|
||||
use super::Data;
|
||||
|
||||
const DEFAULT_SEARCH_LIMIT: usize = 20;
|
||||
@ -26,11 +26,68 @@ pub struct SearchQuery {
|
||||
pub attributes_to_retrieve: Option<Vec<String>>,
|
||||
pub attributes_to_crop: Option<Vec<String>>,
|
||||
pub crop_length: Option<usize>,
|
||||
pub attributes_to_highlight: Option<Vec<String>>,
|
||||
pub attributes_to_highlight: Option<HashSet<String>>,
|
||||
pub filters: Option<String>,
|
||||
pub matches: Option<bool>,
|
||||
pub facet_filters: Option<Value>,
|
||||
pub facets_distribution: Option<Vec<String>>,
|
||||
pub facet_condition: Option<String>,
|
||||
}
|
||||
|
||||
impl SearchQuery {
|
||||
pub fn perform(&self, index: impl AsRef<Index>) -> anyhow::Result<SearchResult>{
|
||||
let index = index.as_ref();
|
||||
|
||||
let before_search = Instant::now();
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
|
||||
let mut search = index.search(&rtxn);
|
||||
|
||||
if let Some(ref query) = self.q {
|
||||
search.query(query);
|
||||
}
|
||||
|
||||
if let Some(ref condition) = self.facet_condition {
|
||||
if !condition.trim().is_empty() {
|
||||
let condition = FacetCondition::from_str(&rtxn, &index, &condition).unwrap();
|
||||
search.facet_condition(condition);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(offset) = self.offset {
|
||||
search.offset(offset);
|
||||
}
|
||||
|
||||
let milli::SearchResult { documents_ids, found_words, nb_hits, limit, } = search.execute()?;
|
||||
|
||||
let mut documents = Vec::new();
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
||||
|
||||
let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() {
|
||||
Some(fields) => fields,
|
||||
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
|
||||
};
|
||||
|
||||
let stop_words = fst::Set::default();
|
||||
let highlighter = Highlighter::new(&stop_words);
|
||||
|
||||
for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() {
|
||||
let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();
|
||||
if let Some(ref attributes_to_highlight) = self.attributes_to_highlight {
|
||||
highlighter.highlight_record(&mut object, &found_words, attributes_to_highlight);
|
||||
}
|
||||
documents.push(object);
|
||||
}
|
||||
|
||||
Ok(SearchResult {
|
||||
hits: documents,
|
||||
nb_hits,
|
||||
query: self.q.clone().unwrap_or_default(),
|
||||
limit,
|
||||
offset: self.offset.unwrap_or_default(),
|
||||
processing_time_ms: before_search.elapsed().as_millis(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@ -105,45 +162,9 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
|
||||
|
||||
impl Data {
|
||||
pub fn search<S: AsRef<str>>(&self, index: S, search_query: SearchQuery) -> anyhow::Result<SearchResult> {
|
||||
let start = Instant::now();
|
||||
let index = self.indexes
|
||||
.get(&index)?
|
||||
.ok_or_else(|| Error::OpenIndex(format!("Index {} doesn't exists.", index.as_ref())))?;
|
||||
|
||||
let Results { found_words, documents_ids, nb_hits, limit, .. } = index.search(&search_query)?;
|
||||
|
||||
let fields_ids_map = index.fields_ids_map()?;
|
||||
|
||||
let displayed_fields = match index.displayed_fields_ids()? {
|
||||
Some(fields) => fields,
|
||||
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
|
||||
};
|
||||
|
||||
let attributes_to_highlight = match search_query.attributes_to_highlight {
|
||||
Some(fields) => fields.iter().map(ToOwned::to_owned).collect(),
|
||||
None => HashSet::new(),
|
||||
};
|
||||
|
||||
let stop_words = fst::Set::default();
|
||||
let highlighter = Highlighter::new(&stop_words);
|
||||
let mut documents = Vec::new();
|
||||
for (_id, obkv) in index.documents(&documents_ids)? {
|
||||
let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();
|
||||
highlighter.highlight_record(&mut object, &found_words, &attributes_to_highlight);
|
||||
documents.push(object);
|
||||
}
|
||||
|
||||
let processing_time_ms = start.elapsed().as_millis();
|
||||
|
||||
let result = SearchResult {
|
||||
hits: documents,
|
||||
nb_hits,
|
||||
query: search_query.q.unwrap_or_default(),
|
||||
offset: search_query.offset.unwrap_or(0),
|
||||
limit,
|
||||
processing_time_ms,
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
match self.index_controller.index(&index)? {
|
||||
Some(index) => Ok(search_query.perform(index)?),
|
||||
None => bail!("index {:?} doesn't exists", index.as_ref()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,15 +1,12 @@
|
||||
use std::ops::Deref;
|
||||
|
||||
use milli::update::{IndexDocumentsMethod, UpdateFormat};
|
||||
//use milli::update_store::UpdateStatus;
|
||||
use async_compression::tokio_02::write::GzipEncoder;
|
||||
use futures_util::stream::StreamExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use super::Data;
|
||||
use crate::index_controller::IndexController;
|
||||
use crate::index_controller::{UpdateStatusResponse, Settings};
|
||||
|
||||
use crate::index_controller::{IndexController, UpdateStatusResponse, Settings};
|
||||
|
||||
impl Data {
|
||||
pub async fn add_documents<B, E, S>(
|
||||
@ -39,8 +36,8 @@ impl Data {
|
||||
let file = file.into_std().await;
|
||||
let mmap = unsafe { memmap::Mmap::map(&file)? };
|
||||
|
||||
let indexes = self.indexes.clone();
|
||||
let update = tokio::task::spawn_blocking(move ||indexes.add_documents(index, method, format, &mmap[..])).await??;
|
||||
let index_controller = self.index_controller.clone();
|
||||
let update = tokio::task::spawn_blocking(move ||index_controller.add_documents(index, method, format, &mmap[..])).await??;
|
||||
Ok(update.into())
|
||||
}
|
||||
|
||||
@ -49,7 +46,7 @@ impl Data {
|
||||
index: S,
|
||||
settings: Settings
|
||||
) -> anyhow::Result<UpdateStatusResponse> {
|
||||
let indexes = self.indexes.clone();
|
||||
let indexes = self.index_controller.clone();
|
||||
let update = tokio::task::spawn_blocking(move || indexes.update_settings(index, settings)).await??;
|
||||
Ok(update.into())
|
||||
}
|
||||
|
17
src/index_controller/_update_store/mod.rs
Normal file
17
src/index_controller/_update_store/mod.rs
Normal file
@ -0,0 +1,17 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use heed::Env;
|
||||
|
||||
use super::IndexStore;
|
||||
|
||||
pub struct UpdateStore {
|
||||
env: Env,
|
||||
index_store: Arc<IndexStore>,
|
||||
}
|
||||
|
||||
impl UpdateStore {
|
||||
pub fn new(env: Env, index_store: Arc<IndexStore>) -> anyhow::Result<Self> {
|
||||
Ok(Self { env, index_store })
|
||||
}
|
||||
}
|
||||
|
@ -1,78 +1,12 @@
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, Utc};
|
||||
use dashmap::DashMap;
|
||||
use heed::types::{Str, SerdeBincode};
|
||||
use heed::{EnvOpenOptions, Env, Database};
|
||||
use milli::{Index, FieldsIdsMap, SearchResult, FieldId, facet::FacetType};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use ouroboros::self_referencing;
|
||||
|
||||
use crate::data::SearchQuery;
|
||||
|
||||
const CONTROLLER_META_FILENAME: &str = "index_controller_meta";
|
||||
const INDEXES_CONTROLLER_FILENAME: &str = "indexes_db";
|
||||
const INDEXES_DB_NAME: &str = "indexes_db";
|
||||
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct IndexStoreMeta {
|
||||
open_options: EnvOpenOptions,
|
||||
created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl IndexStoreMeta {
|
||||
fn from_path(path: impl AsRef<Path>) -> Result<Option<IndexStoreMeta>> {
|
||||
let mut path = path.as_ref().to_path_buf();
|
||||
path.push(CONTROLLER_META_FILENAME);
|
||||
if path.exists() {
|
||||
let mut file = File::open(path)?;
|
||||
let mut buffer = Vec::new();
|
||||
let n = file.read_to_end(&mut buffer)?;
|
||||
let meta: IndexStoreMeta = serde_json::from_slice(&buffer[..n])?;
|
||||
Ok(Some(meta))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn to_path(self, path: impl AsRef<Path>) -> Result<()> {
|
||||
let mut path = path.as_ref().to_path_buf();
|
||||
path.push(CONTROLLER_META_FILENAME);
|
||||
if path.exists() {
|
||||
Err(anyhow::anyhow!("Index controller metadata already exists"))
|
||||
} else {
|
||||
let mut file = File::create(path)?;
|
||||
let json = serde_json::to_vec(&self)?;
|
||||
file.write_all(&json)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct IndexMetadata {
|
||||
created_at: DateTime<Utc>,
|
||||
open_options: EnvOpenOptions,
|
||||
uuid: String,
|
||||
}
|
||||
|
||||
impl IndexMetadata {
|
||||
fn open_index(self, path: impl AsRef<Path>) -> Result<Index> {
|
||||
// create a path in the form "db_path/indexes/index_id"
|
||||
let mut path = path.as_ref().to_path_buf();
|
||||
path.push("indexes");
|
||||
path.push(&self.uuid);
|
||||
Ok(Index::new(self.open_options, path)?)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[self_referencing]
|
||||
pub struct IndexView {
|
||||
pub index: Arc<Index>,
|
||||
@ -136,120 +70,5 @@ impl IndexView {
|
||||
let index = self.borrow_index();
|
||||
Ok(index.documents(txn, ids.into_iter().copied())?)
|
||||
}
|
||||
|
||||
//pub async fn add_documents<B, E>(
|
||||
//&self,
|
||||
//method: IndexDocumentsMethod,
|
||||
//format: UpdateFormat,
|
||||
//mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin,
|
||||
//) -> anyhow::Result<UpdateStatusResponse>
|
||||
//where
|
||||
//B: Deref<Target = [u8]>,
|
||||
//E: std::error::Error + Send + Sync + 'static,
|
||||
//{
|
||||
//let file = tokio::task::spawn_blocking(tempfile::tempfile).await?;
|
||||
//let file = tokio::fs::File::from_std(file?);
|
||||
//let mut encoder = GzipEncoder::new(file);
|
||||
|
||||
//while let Some(result) = stream.next().await {
|
||||
//let bytes = &*result?;
|
||||
//encoder.write_all(&bytes[..]).await?;
|
||||
//}
|
||||
|
||||
//encoder.shutdown().await?;
|
||||
//let mut file = encoder.into_inner();
|
||||
//file.sync_all().await?;
|
||||
//let file = file.into_std().await;
|
||||
//let mmap = unsafe { memmap::Mmap::map(&file)? };
|
||||
|
||||
//let meta = UpdateMeta::DocumentsAddition { method, format };
|
||||
|
||||
//let index = self.index.clone();
|
||||
//let queue = self.update_store.clone();
|
||||
//let update = tokio::task::spawn_blocking(move || queue.register_update(index, meta, &mmap[..])).await??;
|
||||
//Ok(update.into())
|
||||
//}
|
||||
}
|
||||
|
||||
pub struct IndexStore {
|
||||
path: PathBuf,
|
||||
env: Env,
|
||||
indexes_db: Database<Str, SerdeBincode<IndexMetadata>>,
|
||||
indexes: DashMap<String, (String, Arc<Index>)>,
|
||||
}
|
||||
|
||||
impl IndexStore {
|
||||
/// Open the index controller from meta found at path, and create a new one if no meta is
|
||||
/// found.
|
||||
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
|
||||
// If index controller metadata is present, we return the env, otherwise, we create a new
|
||||
// metadata from scratch before returning a new env.
|
||||
let path = path.as_ref().to_path_buf();
|
||||
let env = match IndexStoreMeta::from_path(&path)? {
|
||||
Some(meta) => meta.open_options.open(INDEXES_CONTROLLER_FILENAME)?,
|
||||
None => {
|
||||
let mut open_options = EnvOpenOptions::new();
|
||||
open_options.map_size(page_size::get() * 1000);
|
||||
let env = open_options.open(INDEXES_CONTROLLER_FILENAME)?;
|
||||
let created_at = Utc::now();
|
||||
let meta = IndexStoreMeta { open_options: open_options.clone(), created_at };
|
||||
meta.to_path(&path)?;
|
||||
env
|
||||
}
|
||||
};
|
||||
let indexes = DashMap::new();
|
||||
let indexes_db = match env.open_database(Some(INDEXES_DB_NAME))? {
|
||||
Some(indexes_db) => indexes_db,
|
||||
None => env.create_database(Some(INDEXES_DB_NAME))?,
|
||||
};
|
||||
|
||||
Ok(Self { env, indexes, indexes_db, path })
|
||||
}
|
||||
|
||||
pub fn get_or_create<S: AsRef<str>>(&self, _name: S) -> Result<IndexView> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Get an index with read access to the db. The index are lazily loaded, meaning that we first
|
||||
/// check for its exixtence in the indexes map, and if it doesn't exist, the index db is check
|
||||
/// for metadata to launch the index.
|
||||
pub fn get<S: AsRef<str>>(&self, name: S) -> Result<Option<IndexView>> {
|
||||
match self.indexes.get(name.as_ref()) {
|
||||
Some(entry) => {
|
||||
let index = entry.1.clone();
|
||||
let uuid = entry.0.clone();
|
||||
let view = IndexView::try_new(index, |index| index.read_txn(), uuid)?;
|
||||
Ok(Some(view))
|
||||
}
|
||||
None => {
|
||||
let txn = self.env.read_txn()?;
|
||||
match self.indexes_db.get(&txn, name.as_ref())? {
|
||||
Some(meta) => {
|
||||
let uuid = meta.uuid.clone();
|
||||
let index = Arc::new(meta.open_index(&self.path)?);
|
||||
self.indexes.insert(name.as_ref().to_owned(), (uuid.clone(), index.clone()));
|
||||
let view = IndexView::try_new(index, |index| index.read_txn(), uuid)?;
|
||||
Ok(Some(view))
|
||||
}
|
||||
None => Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_mut<S: AsRef<str>>(&self, _name: S) -> Result<Option<IndexView>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn delete_index<S: AsRef<str>>(&self, _name:S) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn list_indices(&self) -> Result<Vec<(String, IndexMetadata)>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn rename_index(&self, _old: &str, _new: &str) -> Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
188
src/index_controller/local_index_controller/index_store.rs
Normal file
188
src/index_controller/local_index_controller/index_store.rs
Normal file
@ -0,0 +1,188 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use dashmap::mapref::entry::Entry;
|
||||
use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn};
|
||||
use milli::Index;
|
||||
use rayon::ThreadPool;
|
||||
use uuid::Uuid;
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
use super::update_store::UpdateStore;
|
||||
use super::update_handler::UpdateHandler;
|
||||
use crate::option::IndexerOpts;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct IndexMeta {
|
||||
update_size: usize,
|
||||
index_size: usize,
|
||||
uid: Uuid,
|
||||
}
|
||||
|
||||
impl IndexMeta {
|
||||
fn open(
|
||||
&self,
|
||||
path: impl AsRef<Path>,
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
opt: &IndexerOpts,
|
||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
||||
let update_path = make_update_db_path(&path, &self.uid);
|
||||
let index_path = make_index_db_path(&path, &self.uid);
|
||||
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(self.index_size);
|
||||
let index = Arc::new(Index::new(options, index_path)?);
|
||||
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(self.update_size);
|
||||
let handler = UpdateHandler::new(opt, index.clone(), thread_pool)?;
|
||||
let update_store = UpdateStore::open(options, update_path, handler)?;
|
||||
Ok((index, update_store))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IndexStore {
|
||||
env: Env,
|
||||
name_to_uid: DashMap<String, Uuid>,
|
||||
name_to_uid_db: Database<Str, ByteSlice>,
|
||||
uid_to_index: DashMap<Uuid, (Arc<Index>, Arc<UpdateStore>)>,
|
||||
uid_to_index_db: Database<ByteSlice, SerdeJson<IndexMeta>>,
|
||||
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
opt: IndexerOpts,
|
||||
}
|
||||
|
||||
impl IndexStore {
|
||||
pub fn new(path: impl AsRef<Path>, opt: IndexerOpts) -> anyhow::Result<Self> {
|
||||
let env = EnvOpenOptions::new()
|
||||
.map_size(4096 * 100)
|
||||
.max_dbs(2)
|
||||
.open(path)?;
|
||||
|
||||
let name_to_uid = DashMap::new();
|
||||
let uid_to_index = DashMap::new();
|
||||
let name_to_uid_db = open_or_create_database(&env, Some("name_to_uid"))?;
|
||||
let uid_to_index_db = open_or_create_database(&env, Some("uid_to_index_db"))?;
|
||||
|
||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(opt.indexing_jobs.unwrap_or(0))
|
||||
.build()?;
|
||||
let thread_pool = Arc::new(thread_pool);
|
||||
|
||||
Ok(Self {
|
||||
env,
|
||||
name_to_uid,
|
||||
name_to_uid_db,
|
||||
uid_to_index,
|
||||
uid_to_index_db,
|
||||
|
||||
thread_pool,
|
||||
opt,
|
||||
})
|
||||
}
|
||||
|
||||
fn index_uid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> {
|
||||
match self.name_to_uid.entry(name.as_ref().to_string()) {
|
||||
Entry::Vacant(entry) => {
|
||||
match self.name_to_uid_db.get(txn, name.as_ref())? {
|
||||
Some(bytes) => {
|
||||
let uuid = Uuid::from_slice(bytes)?;
|
||||
entry.insert(uuid);
|
||||
Ok(Some(uuid))
|
||||
}
|
||||
None => Ok(None)
|
||||
}
|
||||
}
|
||||
Entry::Occupied(entry) => Ok(Some(entry.get().clone())),
|
||||
}
|
||||
}
|
||||
|
||||
fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
||||
match self.uid_to_index.entry(uid.clone()) {
|
||||
Entry::Vacant(entry) => {
|
||||
match self.uid_to_index_db.get(txn, uid.as_bytes())? {
|
||||
Some(meta) => {
|
||||
let path = self.env.path();
|
||||
let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.opt)?;
|
||||
entry.insert((index.clone(), updates.clone()));
|
||||
Ok(Some((index, updates)))
|
||||
},
|
||||
None => Ok(None)
|
||||
}
|
||||
}
|
||||
Entry::Occupied(entry) => {
|
||||
let (index, updates) = entry.get();
|
||||
Ok(Some((index.clone(), updates.clone())))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn _get_index(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
||||
match self.index_uid(&txn, name)? {
|
||||
Some(uid) => self.retrieve_index(&txn, uid),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
||||
let txn = self.env.read_txn()?;
|
||||
self._get_index(&txn, name)
|
||||
}
|
||||
|
||||
pub fn get_or_create_index(
|
||||
&self, name: impl AsRef<str>,
|
||||
update_size: usize,
|
||||
index_size: usize,
|
||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
||||
let mut txn = self.env.write_txn()?;
|
||||
match self._get_index(&txn, name.as_ref())? {
|
||||
Some(res) => Ok(res),
|
||||
None => {
|
||||
let uid = Uuid::new_v4();
|
||||
// TODO: clean in case of error
|
||||
Ok(self.create_index(&mut txn, uid, name, update_size, index_size)?)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn create_index( &self,
|
||||
txn: &mut RwTxn,
|
||||
uid: Uuid,
|
||||
name: impl AsRef<str>,
|
||||
update_size: usize,
|
||||
index_size: usize,
|
||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
||||
let meta = IndexMeta { update_size, index_size, uid: uid.clone() };
|
||||
|
||||
self.name_to_uid_db.put(txn, name.as_ref(), uid.as_bytes())?;
|
||||
self.uid_to_index_db.put(txn, uid.as_bytes(), &meta)?;
|
||||
|
||||
let path = self.env.path();
|
||||
let (index, update_store) = meta.open(path, self.thread_pool.clone(), &self.opt)?;
|
||||
|
||||
self.name_to_uid.insert(name.as_ref().to_string(), uid);
|
||||
self.uid_to_index.insert(uid, (index.clone(), update_store.clone()));
|
||||
|
||||
Ok((index, update_store))
|
||||
}
|
||||
}
|
||||
|
||||
fn open_or_create_database<K: 'static, V: 'static>(env: &Env, name: Option<&str>) -> anyhow::Result<Database<K, V>> {
|
||||
match env.open_database::<K, V>(name)? {
|
||||
Some(db) => Ok(db),
|
||||
None => Ok(env.create_database::<K, V>(name)?),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_update_db_path(path: impl AsRef<Path>, uid: &Uuid) -> PathBuf {
|
||||
let mut path = path.as_ref().to_path_buf();
|
||||
path.push(format!("update{}", uid));
|
||||
path
|
||||
}
|
||||
|
||||
fn make_index_db_path(path: impl AsRef<Path>, uid: &Uuid) -> PathBuf {
|
||||
let mut path = path.as_ref().to_path_buf();
|
||||
path.push(format!("index{}", uid));
|
||||
path
|
||||
}
|
57
src/index_controller/local_index_controller/mod.rs
Normal file
57
src/index_controller/local_index_controller/mod.rs
Normal file
@ -0,0 +1,57 @@
|
||||
mod update_store;
|
||||
mod index_store;
|
||||
mod update_handler;
|
||||
|
||||
use index_store::IndexStore;
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use milli::Index;
|
||||
|
||||
use crate::option::IndexerOpts;
|
||||
use super::IndexController;
|
||||
|
||||
pub struct LocalIndexController {
|
||||
indexes: IndexStore,
|
||||
}
|
||||
|
||||
impl LocalIndexController {
|
||||
pub fn new(path: impl AsRef<Path>, opt: IndexerOpts) -> anyhow::Result<Self> {
|
||||
let indexes = IndexStore::new(path, opt)?;
|
||||
Ok(Self { indexes })
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexController for LocalIndexController {
|
||||
fn add_documents<S: AsRef<str>>(
|
||||
&self,
|
||||
_index: S,
|
||||
_method: milli::update::IndexDocumentsMethod,
|
||||
_format: milli::update::UpdateFormat,
|
||||
_data: &[u8],
|
||||
) -> anyhow::Result<super::UpdateStatusResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn update_settings<S: AsRef<str>>(&self, _index_uid: S, _settings: super::Settings) -> anyhow::Result<super::UpdateStatusResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn create_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn delete_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn swap_indices<S1: AsRef<str>, S2: AsRef<str>>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>> {
|
||||
let index = self.indexes.index(name)?.map(|(i, _)| i);
|
||||
Ok(index)
|
||||
}
|
||||
}
|
206
src/index_controller/local_index_controller/update_handler.rs
Normal file
206
src/index_controller/local_index_controller/update_handler.rs
Normal file
@ -0,0 +1,206 @@
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::Result;
|
||||
use flate2::read::GzDecoder;
|
||||
use grenad::CompressionType;
|
||||
use log::info;
|
||||
use milli::Index;
|
||||
use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod};
|
||||
use rayon::ThreadPool;
|
||||
|
||||
use crate::index_controller::updates::{Processing, Processed, Failed};
|
||||
use crate::index_controller::{UpdateResult, UpdateMeta, Settings, Facets};
|
||||
use crate::option::IndexerOpts;
|
||||
use super::update_store::HandleUpdate;
|
||||
|
||||
pub struct UpdateHandler {
|
||||
index: Arc<Index>,
|
||||
max_nb_chunks: Option<usize>,
|
||||
chunk_compression_level: Option<u32>,
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
log_frequency: usize,
|
||||
max_memory: usize,
|
||||
linked_hash_map_size: usize,
|
||||
chunk_compression_type: CompressionType,
|
||||
chunk_fusing_shrink_size: u64,
|
||||
}
|
||||
|
||||
impl UpdateHandler {
|
||||
pub fn new(
|
||||
opt: &IndexerOpts,
|
||||
index: Arc<Index>,
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
index,
|
||||
max_nb_chunks: opt.max_nb_chunks,
|
||||
chunk_compression_level: opt.chunk_compression_level,
|
||||
thread_pool,
|
||||
log_frequency: opt.log_every_n,
|
||||
max_memory: opt.max_memory.get_bytes() as usize,
|
||||
linked_hash_map_size: opt.linked_hash_map_size,
|
||||
chunk_compression_type: opt.chunk_compression_type,
|
||||
chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(),
|
||||
})
|
||||
}
|
||||
|
||||
fn update_buidler(&self, update_id: u64) -> UpdateBuilder {
|
||||
// We prepare the update by using the update builder.
|
||||
let mut update_builder = UpdateBuilder::new(update_id);
|
||||
if let Some(max_nb_chunks) = self.max_nb_chunks {
|
||||
update_builder.max_nb_chunks(max_nb_chunks);
|
||||
}
|
||||
if let Some(chunk_compression_level) = self.chunk_compression_level {
|
||||
update_builder.chunk_compression_level(chunk_compression_level);
|
||||
}
|
||||
update_builder.thread_pool(&self.thread_pool);
|
||||
update_builder.log_every_n(self.log_frequency);
|
||||
update_builder.max_memory(self.max_memory);
|
||||
update_builder.linked_hash_map_size(self.linked_hash_map_size);
|
||||
update_builder.chunk_compression_type(self.chunk_compression_type);
|
||||
update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size);
|
||||
update_builder
|
||||
}
|
||||
|
||||
fn update_documents(
|
||||
&self,
|
||||
format: UpdateFormat,
|
||||
method: IndexDocumentsMethod,
|
||||
content: &[u8],
|
||||
update_builder: UpdateBuilder,
|
||||
) -> anyhow::Result<UpdateResult> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = self.index.write_txn()?;
|
||||
let mut builder = update_builder.index_documents(&mut wtxn, &self.index);
|
||||
builder.update_format(format);
|
||||
builder.index_documents_method(method);
|
||||
|
||||
let gzipped = true;
|
||||
let reader = if gzipped {
|
||||
Box::new(GzDecoder::new(content))
|
||||
} else {
|
||||
Box::new(content) as Box<dyn io::Read>
|
||||
};
|
||||
|
||||
let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
|
||||
|
||||
match result {
|
||||
Ok(addition_result) => wtxn
|
||||
.commit()
|
||||
.and(Ok(UpdateResult::DocumentsAddition(addition_result)))
|
||||
.map_err(Into::into),
|
||||
Err(e) => Err(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result<UpdateResult> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = self.index.write_txn()?;
|
||||
let builder = update_builder.clear_documents(&mut wtxn, &self.index);
|
||||
|
||||
match builder.execute() {
|
||||
Ok(_count) => wtxn
|
||||
.commit()
|
||||
.and(Ok(UpdateResult::Other))
|
||||
.map_err(Into::into),
|
||||
Err(e) => Err(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> anyhow::Result<UpdateResult> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = self.index.write_txn()?;
|
||||
let mut builder = update_builder.settings(&mut wtxn, &self.index);
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(ref names) = settings.searchable_attributes {
|
||||
match names {
|
||||
Some(names) => builder.set_searchable_fields(names.clone()),
|
||||
None => builder.reset_searchable_fields(),
|
||||
}
|
||||
}
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(ref names) = settings.displayed_attributes {
|
||||
match names {
|
||||
Some(names) => builder.set_displayed_fields(names.clone()),
|
||||
None => builder.reset_displayed_fields(),
|
||||
}
|
||||
}
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(ref facet_types) = settings.faceted_attributes {
|
||||
let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new());
|
||||
builder.set_faceted_fields(facet_types);
|
||||
}
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(ref criteria) = settings.criteria {
|
||||
match criteria {
|
||||
Some(criteria) => builder.set_criteria(criteria.clone()),
|
||||
None => builder.reset_criteria(),
|
||||
}
|
||||
}
|
||||
|
||||
let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
|
||||
|
||||
match result {
|
||||
Ok(()) => wtxn
|
||||
.commit()
|
||||
.and(Ok(UpdateResult::Other))
|
||||
.map_err(Into::into),
|
||||
Err(e) => Err(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
fn update_facets(
|
||||
&self,
|
||||
levels: &Facets,
|
||||
update_builder: UpdateBuilder
|
||||
) -> anyhow::Result<UpdateResult> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = self.index.write_txn()?;
|
||||
let mut builder = update_builder.facets(&mut wtxn, &self.index);
|
||||
if let Some(value) = levels.level_group_size {
|
||||
builder.level_group_size(value);
|
||||
}
|
||||
if let Some(value) = levels.min_level_size {
|
||||
builder.min_level_size(value);
|
||||
}
|
||||
match builder.execute() {
|
||||
Ok(()) => wtxn
|
||||
.commit()
|
||||
.and(Ok(UpdateResult::Other))
|
||||
.map_err(Into::into),
|
||||
Err(e) => Err(e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
|
||||
fn handle_update(
|
||||
&mut self,
|
||||
update_id: u64,
|
||||
meta: Processing<UpdateMeta>,
|
||||
content: &[u8]
|
||||
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
|
||||
use UpdateMeta::*;
|
||||
|
||||
let update_builder = self.update_buidler(update_id);
|
||||
|
||||
let result = match meta.meta() {
|
||||
DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder),
|
||||
ClearDocuments => self.clear_documents(update_builder),
|
||||
Settings(settings) => self.update_settings(settings, update_builder),
|
||||
Facets(levels) => self.update_facets(levels, update_builder),
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(result) => Ok(meta.process(result)),
|
||||
Err(e) => Err(meta.fail(e.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
311
src/index_controller/local_index_controller/update_store.rs
Normal file
311
src/index_controller/local_index_controller/update_store.rs
Normal file
@ -0,0 +1,311 @@
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use crossbeam_channel::Sender;
|
||||
use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice};
|
||||
use heed::{EnvOpenOptions, Env, Database};
|
||||
|
||||
use crate::index_controller::updates::*;
|
||||
use crate::index_controller::{UpdateMeta, UpdateResult};
|
||||
|
||||
type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UpdateStore {
|
||||
env: Env,
|
||||
pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<UpdateMeta>>>,
|
||||
pending: Database<OwnedType<BEU64>, ByteSlice>,
|
||||
processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<UpdateMeta, UpdateResult>>>,
|
||||
failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<UpdateMeta, String>>>,
|
||||
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<UpdateMeta>>>,
|
||||
processing: Arc<RwLock<Option<Processing<UpdateMeta>>>>,
|
||||
notification_sender: Sender<()>,
|
||||
}
|
||||
|
||||
pub trait HandleUpdate<M, N, E> {
|
||||
fn handle_update(&mut self, update_id: u64, meta: Processing<M>, content: &[u8]) -> Result<Processed<M, N>, Failed<M, E>>;
|
||||
}
|
||||
|
||||
impl UpdateStore {
|
||||
pub fn open<P, U>(
|
||||
mut options: EnvOpenOptions,
|
||||
path: P,
|
||||
mut update_handler: U,
|
||||
) -> heed::Result<Arc<UpdateStore>>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
U: HandleUpdate<UpdateMeta, UpdateResult, String> + Send + 'static,
|
||||
{
|
||||
options.max_dbs(5);
|
||||
|
||||
let env = options.open(path)?;
|
||||
let pending_meta = env.create_database(Some("pending-meta"))?;
|
||||
let pending = env.create_database(Some("pending"))?;
|
||||
let processed_meta = env.create_database(Some("processed-meta"))?;
|
||||
let aborted_meta = env.create_database(Some("aborted-meta"))?;
|
||||
let failed_meta = env.create_database(Some("failed-meta"))?;
|
||||
let processing = Arc::new(RwLock::new(None));
|
||||
|
||||
let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1);
|
||||
// Send a first notification to trigger the process.
|
||||
let _ = notification_sender.send(());
|
||||
|
||||
let update_store = Arc::new(UpdateStore {
|
||||
env,
|
||||
pending,
|
||||
pending_meta,
|
||||
processed_meta,
|
||||
aborted_meta,
|
||||
notification_sender,
|
||||
failed_meta,
|
||||
processing,
|
||||
});
|
||||
|
||||
let update_store_cloned = update_store.clone();
|
||||
std::thread::spawn(move || {
|
||||
// Block and wait for something to process.
|
||||
for () in notification_receiver {
|
||||
loop {
|
||||
match update_store_cloned.process_pending_update(&mut update_handler) {
|
||||
Ok(Some(_)) => (),
|
||||
Ok(None) => break,
|
||||
Err(e) => eprintln!("error while processing update: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(update_store)
|
||||
}
|
||||
|
||||
/// Returns the new biggest id to use to store the new update.
|
||||
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
|
||||
let last_pending = self.pending_meta
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.last(txn)?
|
||||
.map(|(k, _)| k.get());
|
||||
|
||||
let last_processed = self.processed_meta
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.last(txn)?
|
||||
.map(|(k, _)| k.get());
|
||||
|
||||
let last_aborted = self.aborted_meta
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.last(txn)?
|
||||
.map(|(k, _)| k.get());
|
||||
|
||||
let last_update_id = [last_pending, last_processed, last_aborted]
|
||||
.iter()
|
||||
.copied()
|
||||
.flatten()
|
||||
.max();
|
||||
|
||||
match last_update_id {
|
||||
Some(last_id) => Ok(last_id + 1),
|
||||
None => Ok(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers the update content in the pending store and the meta
|
||||
/// into the pending-meta store. Returns the new unique update id.
|
||||
pub fn register_update(
|
||||
&self,
|
||||
meta: UpdateMeta,
|
||||
content: &[u8]
|
||||
) -> heed::Result<Pending<UpdateMeta>> {
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
|
||||
// We ask the update store to give us a new update id, this is safe,
|
||||
// no other update can have the same id because we use a write txn before
|
||||
// asking for the id and registering it so other update registering
|
||||
// will be forced to wait for a new write txn.
|
||||
let update_id = self.new_update_id(&wtxn)?;
|
||||
let update_key = BEU64::new(update_id);
|
||||
|
||||
let meta = Pending::new(meta, update_id);
|
||||
self.pending_meta.put(&mut wtxn, &update_key, &meta)?;
|
||||
self.pending.put(&mut wtxn, &update_key, content)?;
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
if let Err(e) = self.notification_sender.try_send(()) {
|
||||
assert!(!e.is_disconnected(), "update notification channel is disconnected");
|
||||
}
|
||||
Ok(meta)
|
||||
}
|
||||
/// Executes the user provided function on the next pending update (the one with the lowest id).
|
||||
/// This is asynchronous as it let the user process the update with a read-only txn and
|
||||
/// only writing the result meta to the processed-meta store *after* it has been processed.
|
||||
fn process_pending_update<U>(&self, handler: &mut U) -> heed::Result<Option<()>>
|
||||
where
|
||||
U: HandleUpdate<UpdateMeta, UpdateResult, String> + Send + 'static,
|
||||
{
|
||||
// Create a read transaction to be able to retrieve the pending update in order.
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let first_meta = self.pending_meta.first(&rtxn)?;
|
||||
|
||||
// If there is a pending update we process and only keep
|
||||
// a reader while processing it, not a writer.
|
||||
match first_meta {
|
||||
Some((first_id, pending)) => {
|
||||
let first_content = self.pending
|
||||
.get(&rtxn, &first_id)?
|
||||
.expect("associated update content");
|
||||
|
||||
// we cahnge the state of the update from pending to processing before we pass it
|
||||
// to the update handler. Processing store is non persistent to be able recover
|
||||
// from a failure
|
||||
let processing = pending.processing();
|
||||
self.processing
|
||||
.write()
|
||||
.unwrap()
|
||||
.replace(processing.clone());
|
||||
// Process the pending update using the provided user function.
|
||||
let result = handler.handle_update(first_id.get(), processing, first_content);
|
||||
drop(rtxn);
|
||||
|
||||
// Once the pending update have been successfully processed
|
||||
// we must remove the content from the pending and processing stores and
|
||||
// write the *new* meta to the processed-meta store and commit.
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
self.processing
|
||||
.write()
|
||||
.unwrap()
|
||||
.take();
|
||||
self.pending_meta.delete(&mut wtxn, &first_id)?;
|
||||
self.pending.delete(&mut wtxn, &first_id)?;
|
||||
match result {
|
||||
Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?,
|
||||
Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?,
|
||||
}
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(Some(()))
|
||||
},
|
||||
None => Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// The id and metadata of the update that is currently being processed,
|
||||
/// `None` if no update is being processed.
|
||||
pub fn processing_update(&self) -> heed::Result<Option<(u64, Pending<UpdateMeta>)>> {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
match self.pending_meta.first(&rtxn)? {
|
||||
Some((key, meta)) => Ok(Some((key.get(), meta))),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the user defined function with the meta-store iterators, the first
|
||||
/// iterator is the *processed* meta one, the second the *aborted* meta one
|
||||
/// and, the last is the *pending* meta one.
|
||||
pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T>
|
||||
where
|
||||
F: for<'a> FnMut(
|
||||
Option<Processing<UpdateMeta>>,
|
||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Processed<UpdateMeta, UpdateResult>>>,
|
||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Aborted<UpdateMeta>>>,
|
||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Pending<UpdateMeta>>>,
|
||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Failed<UpdateMeta, String>>>,
|
||||
) -> heed::Result<T>,
|
||||
{
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
||||
// We get the pending, processed and aborted meta iterators.
|
||||
let processed_iter = self.processed_meta.iter(&rtxn)?;
|
||||
let aborted_iter = self.aborted_meta.iter(&rtxn)?;
|
||||
let pending_iter = self.pending_meta.iter(&rtxn)?;
|
||||
let processing = self.processing.read().unwrap().clone();
|
||||
let failed_iter = self.failed_meta.iter(&rtxn)?;
|
||||
|
||||
// We execute the user defined function with both iterators.
|
||||
(f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter)
|
||||
}
|
||||
|
||||
/// Returns the update associated meta or `None` if the update doesn't exist.
|
||||
pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let key = BEU64::new(update_id);
|
||||
|
||||
if let Some(ref meta) = *self.processing.read().unwrap() {
|
||||
if meta.id() == update_id {
|
||||
return Ok(Some(UpdateStatus::Processing(meta.clone())));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(meta) = self.pending_meta.get(&rtxn, &key)? {
|
||||
return Ok(Some(UpdateStatus::Pending(meta)));
|
||||
}
|
||||
|
||||
if let Some(meta) = self.processed_meta.get(&rtxn, &key)? {
|
||||
return Ok(Some(UpdateStatus::Processed(meta)));
|
||||
}
|
||||
|
||||
if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? {
|
||||
return Ok(Some(UpdateStatus::Aborted(meta)));
|
||||
}
|
||||
|
||||
if let Some(meta) = self.failed_meta.get(&rtxn, &key)? {
|
||||
return Ok(Some(UpdateStatus::Failed(meta)));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Aborts an update, an aborted update content is deleted and
|
||||
/// the meta of it is moved into the aborted updates database.
|
||||
///
|
||||
/// Trying to abort an update that is currently being processed, an update
|
||||
/// that as already been processed or which doesn't actually exist, will
|
||||
/// return `None`.
|
||||
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<Aborted<UpdateMeta>>> {
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let key = BEU64::new(update_id);
|
||||
|
||||
// We cannot abort an update that is currently being processed.
|
||||
if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let pending = match self.pending_meta.get(&wtxn, &key)? {
|
||||
Some(meta) => meta,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let aborted = pending.abort();
|
||||
|
||||
self.aborted_meta.put(&mut wtxn, &key, &aborted)?;
|
||||
self.pending_meta.delete(&mut wtxn, &key)?;
|
||||
self.pending.delete(&mut wtxn, &key)?;
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(Some(aborted))
|
||||
}
|
||||
|
||||
/// Aborts all the pending updates, and not the one being currently processed.
|
||||
/// Returns the update metas and ids that were successfully aborted.
|
||||
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, Aborted<UpdateMeta>)>> {
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let mut aborted_updates = Vec::new();
|
||||
|
||||
// We skip the first pending update as it is currently being processed.
|
||||
for result in self.pending_meta.iter(&wtxn)?.skip(1) {
|
||||
let (key, pending) = result?;
|
||||
let id = key.get();
|
||||
aborted_updates.push((id, pending.abort()));
|
||||
}
|
||||
|
||||
for (id, aborted) in &aborted_updates {
|
||||
let key = BEU64::new(*id);
|
||||
self.aborted_meta.put(&mut wtxn, &key, &aborted)?;
|
||||
self.pending_meta.delete(&mut wtxn, &key)?;
|
||||
self.pending.delete(&mut wtxn, &key)?;
|
||||
}
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(aborted_updates)
|
||||
}
|
||||
}
|
@ -1,18 +1,19 @@
|
||||
mod index_store;
|
||||
mod update_store;
|
||||
mod local_index_controller;
|
||||
mod updates;
|
||||
|
||||
pub use index_store::IndexStore;
|
||||
pub use update_store::UpdateStore;
|
||||
pub use local_index_controller::LocalIndexController;
|
||||
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Deref;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use milli::update::{IndexDocumentsMethod, UpdateFormat};
|
||||
use milli::update_store::{Processed, Processing, Failed, Pending, Aborted};
|
||||
use milli::Index;
|
||||
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
|
||||
use serde::{Serialize, Deserialize, de::Deserializer};
|
||||
|
||||
use updates::{Processed, Processing, Failed, Pending, Aborted};
|
||||
|
||||
pub type UpdateStatusResponse = UpdateStatus<UpdateMeta, UpdateResult, String>;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@ -89,7 +90,7 @@ impl Settings {
|
||||
}
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum UpdateResult {
|
||||
//DocumentsAddition(DocumentAdditionResult),
|
||||
DocumentsAddition(DocumentAdditionResult),
|
||||
Other,
|
||||
}
|
||||
|
||||
@ -97,7 +98,7 @@ pub enum UpdateResult {
|
||||
/// for read access which is provided, and write access which must be provided. This allows the
|
||||
/// implementer to define the behaviour of write accesses to the indices, and abstract the
|
||||
/// scheduling of the updates. The implementer must be able to provide an instance of `IndexStore`
|
||||
pub trait IndexController: Deref<Target = IndexStore> {
|
||||
pub trait IndexController {
|
||||
|
||||
/*
|
||||
* Write operations
|
||||
@ -141,5 +142,7 @@ pub trait IndexController: Deref<Target = IndexStore> {
|
||||
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns, if it exists, an `IndexView` to the requested index.
|
||||
fn index(&self, uid: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>>;
|
||||
}
|
||||
|
@ -1,49 +0,0 @@
|
||||
use std::ops::Deref;
|
||||
|
||||
use super::{IndexStore, IndexController};
|
||||
|
||||
pub struct UpdateStore {
|
||||
index_store: IndexStore,
|
||||
}
|
||||
|
||||
impl Deref for UpdateStore {
|
||||
type Target = IndexStore;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.index_store
|
||||
}
|
||||
}
|
||||
|
||||
impl UpdateStore {
|
||||
pub fn new(index_store: IndexStore) -> Self {
|
||||
Self { index_store }
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexController for UpdateStore {
|
||||
fn add_documents<S: AsRef<str>>(
|
||||
&self,
|
||||
_index: S,
|
||||
_method: milli::update::IndexDocumentsMethod,
|
||||
_format: milli::update::UpdateFormat,
|
||||
_data: &[u8],
|
||||
) -> anyhow::Result<crate::index_controller::UpdateStatusResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn update_settings<S: AsRef<str>>(&self, _index_uid: S, _settings: crate::index_controller::Settings) -> anyhow::Result<crate::index_controller::UpdateStatusResponse> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn create_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn delete_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn swap_indices<S1: AsRef<str>, S2: AsRef<str>>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
}
|
167
src/index_controller/updates.rs
Normal file
167
src/index_controller/updates.rs
Normal file
@ -0,0 +1,167 @@
|
||||
use chrono::{Utc, DateTime};
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
pub struct Pending<M> {
|
||||
update_id: u64,
|
||||
meta: M,
|
||||
enqueued_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl<M> Pending<M> {
|
||||
pub fn new(meta: M, update_id: u64) -> Self {
|
||||
Self {
|
||||
enqueued_at: Utc::now(),
|
||||
meta,
|
||||
update_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn processing(self) -> Processing<M> {
|
||||
Processing {
|
||||
from: self,
|
||||
started_processing_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn abort(self) -> Aborted<M> {
|
||||
Aborted {
|
||||
from: self,
|
||||
aborted_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &M {
|
||||
&self.meta
|
||||
}
|
||||
|
||||
pub fn id(&self) -> u64 {
|
||||
self.update_id
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
pub struct Processed<M, N> {
|
||||
success: N,
|
||||
processed_at: DateTime<Utc>,
|
||||
#[serde(flatten)]
|
||||
from: Processing<M>,
|
||||
}
|
||||
|
||||
impl<M, N> Processed<M, N> {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
pub struct Processing<M> {
|
||||
#[serde(flatten)]
|
||||
from: Pending<M>,
|
||||
started_processing_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl<M> Processing<M> {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &M {
|
||||
self.from.meta()
|
||||
}
|
||||
|
||||
pub fn process<N>(self, meta: N) -> Processed<M, N> {
|
||||
Processed {
|
||||
success: meta,
|
||||
from: self,
|
||||
processed_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fail<E>(self, error: E) -> Failed<M, E> {
|
||||
Failed {
|
||||
from: self,
|
||||
error,
|
||||
failed_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
pub struct Aborted<M> {
|
||||
#[serde(flatten)]
|
||||
from: Pending<M>,
|
||||
aborted_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl<M> Aborted<M> {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
pub struct Failed<M, E> {
|
||||
#[serde(flatten)]
|
||||
from: Processing<M>,
|
||||
error: E,
|
||||
failed_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl<M, E> Failed<M, E> {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize)]
|
||||
#[serde(tag = "status")]
|
||||
pub enum UpdateStatus<M, N, E> {
|
||||
Processing(Processing<M>),
|
||||
Pending(Pending<M>),
|
||||
Processed(Processed<M, N>),
|
||||
Aborted(Aborted<M>),
|
||||
Failed(Failed<M, E>),
|
||||
}
|
||||
|
||||
impl<M, N, E> UpdateStatus<M, N, E> {
|
||||
pub fn id(&self) -> u64 {
|
||||
match self {
|
||||
UpdateStatus::Processing(u) => u.id(),
|
||||
UpdateStatus::Pending(u) => u.id(),
|
||||
UpdateStatus::Processed(u) => u.id(),
|
||||
UpdateStatus::Aborted(u) => u.id(),
|
||||
UpdateStatus::Failed(u) => u.id(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Pending<M>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Pending<M>) -> Self {
|
||||
Self::Pending(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Aborted<M>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Aborted<M>) -> Self {
|
||||
Self::Aborted(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Processed<M, N>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Processed<M, N>) -> Self {
|
||||
Self::Processed(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Processing<M>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Processing<M>) -> Self {
|
||||
Self::Processing(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Failed<M, E>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Failed<M, E>) -> Self {
|
||||
Self::Failed(other)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user