MeiliSearch/meilidb-core/src/update/mod.rs

224 lines
6.6 KiB
Rust
Raw Normal View History

2019-10-11 15:33:35 +02:00
mod customs_update;
2019-10-03 15:04:11 +02:00
mod documents_addition;
mod documents_deletion;
mod schema_update;
2019-10-08 17:06:56 +02:00
mod synonyms_addition;
2019-10-08 17:16:48 +02:00
mod synonyms_deletion;
2019-10-03 15:04:11 +02:00
2019-10-11 15:33:35 +02:00
pub use self::customs_update::{apply_customs_update, push_customs_update};
2019-10-18 13:05:28 +02:00
pub use self::documents_addition::{apply_documents_addition, DocumentsAddition};
pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion};
pub use self::schema_update::{apply_schema_update, push_schema_update};
2019-10-18 13:05:28 +02:00
pub use self::synonyms_addition::{apply_synonyms_addition, SynonymsAddition};
pub use self::synonyms_deletion::{apply_synonyms_deletion, SynonymsDeletion};
2019-10-03 15:04:11 +02:00
use std::cmp;
2019-10-18 13:05:28 +02:00
use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use log::debug;
2019-10-18 13:05:28 +02:00
use serde::{Deserialize, Serialize};
2019-10-16 17:05:24 +02:00
use zlmdb::Result as ZResult;
2019-10-18 13:05:28 +02:00
use crate::{store, DocumentId, MResult, RankedMap};
use meilidb_schema::Schema;
2019-10-03 15:04:11 +02:00
2019-10-16 17:05:24 +02:00
#[derive(Debug, Clone, Serialize, Deserialize)]
2019-10-03 15:04:11 +02:00
pub enum Update {
Schema(Schema),
2019-10-11 15:33:35 +02:00
Customs(Vec<u8>),
2019-10-11 16:16:21 +02:00
DocumentsAddition(Vec<serde_json::Value>),
2019-10-03 15:04:11 +02:00
DocumentsDeletion(Vec<DocumentId>),
2019-10-08 17:06:56 +02:00
SynonymsAddition(BTreeMap<String, Vec<String>>),
2019-10-08 17:16:48 +02:00
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
2019-10-03 15:04:11 +02:00
}
#[derive(Debug, Clone, Serialize, Deserialize)]
2019-10-03 16:13:09 +02:00
pub enum UpdateType {
Schema { schema: Schema },
2019-10-11 15:33:35 +02:00
Customs,
2019-10-03 16:13:09 +02:00
DocumentsAddition { number: usize },
DocumentsDeletion { number: usize },
2019-10-08 17:06:56 +02:00
SynonymsAddition { number: usize },
2019-10-08 17:16:48 +02:00
SynonymsDeletion { number: usize },
2019-10-03 16:13:09 +02:00
}
#[derive(Clone, Serialize, Deserialize)]
pub struct DetailedDuration {
pub main: Duration,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct UpdateResult {
pub update_id: u64,
pub update_type: UpdateType,
pub result: Result<(), String>,
pub detailed_duration: DetailedDuration,
}
2019-10-03 16:54:37 +02:00
#[derive(Clone, Serialize, Deserialize)]
pub enum UpdateStatus {
Enqueued,
Processed(UpdateResult),
Unknown,
}
2019-10-16 17:05:24 +02:00
pub fn update_status(
reader: &zlmdb::RoTxn,
2019-10-03 16:54:37 +02:00
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
update_id: u64,
2019-10-18 13:05:28 +02:00
) -> MResult<UpdateStatus> {
2019-10-03 16:54:37 +02:00
match updates_results_store.update_result(reader, update_id)? {
Some(result) => Ok(UpdateStatus::Processed(result)),
None => {
if updates_store.contains(reader, update_id)? {
Ok(UpdateStatus::Enqueued)
} else {
Ok(UpdateStatus::Unknown)
}
}
}
}
pub fn next_update_id(
2019-10-16 17:05:24 +02:00
writer: &mut zlmdb::RwTxn,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
2019-10-18 13:05:28 +02:00
) -> ZResult<u64> {
let last_update_id = updates_store.last_update_id(writer)?;
let last_update_id = last_update_id.map(|(n, _)| n);
let last_update_results_id = updates_results_store.last_update_id(writer)?;
let last_update_results_id = last_update_results_id.map(|(n, _)| n);
let max_update_id = cmp::max(last_update_id, last_update_results_id);
let new_update_id = max_update_id.map_or(0, |n| n + 1);
2019-10-08 17:16:48 +02:00
Ok(new_update_id)
2019-10-08 17:16:48 +02:00
}
2019-10-18 13:05:28 +02:00
pub fn update_task(
writer: &mut zlmdb::RwTxn,
index: store::Index,
) -> MResult<Option<UpdateResult>> {
let (update_id, update) = match index.updates.pop_front(writer)? {
Some(value) => value,
2019-10-09 17:23:48 +02:00
None => return Ok(None),
};
debug!("Processing update number {}", update_id);
let (update_type, result, duration) = match update {
Update::Schema(schema) => {
let start = Instant::now();
2019-10-09 11:45:19 +02:00
2019-10-18 13:05:28 +02:00
let update_type = UpdateType::Schema {
schema: schema.clone(),
};
let result = apply_schema_update(writer, index.main, &schema);
(update_type, result, start.elapsed())
2019-10-18 13:05:28 +02:00
}
2019-10-11 15:33:35 +02:00
Update::Customs(customs) => {
let start = Instant::now();
let update_type = UpdateType::Customs;
2019-10-16 17:05:24 +02:00
let result = apply_customs_update(writer, index.main, &customs).map_err(Into::into);
2019-10-11 15:33:35 +02:00
(update_type, result, start.elapsed())
}
Update::DocumentsAddition(documents) => {
let start = Instant::now();
let ranked_map = match index.main.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
2019-10-18 13:05:28 +02:00
let update_type = UpdateType::DocumentsAddition {
number: documents.len(),
};
let result = apply_documents_addition(
writer,
index.main,
index.documents_fields,
index.documents_fields_counts,
index.postings_lists,
index.docs_words,
ranked_map,
documents,
);
(update_type, result, start.elapsed())
2019-10-18 13:05:28 +02:00
}
Update::DocumentsDeletion(documents) => {
let start = Instant::now();
let ranked_map = match index.main.ranked_map(writer)? {
Some(ranked_map) => ranked_map,
None => RankedMap::default(),
};
2019-10-18 13:05:28 +02:00
let update_type = UpdateType::DocumentsDeletion {
number: documents.len(),
};
let result = apply_documents_deletion(
writer,
index.main,
index.documents_fields,
index.documents_fields_counts,
index.postings_lists,
index.docs_words,
ranked_map,
documents,
);
2019-10-08 17:06:56 +02:00
(update_type, result, start.elapsed())
2019-10-18 13:05:28 +02:00
}
2019-10-08 17:06:56 +02:00
Update::SynonymsAddition(synonyms) => {
let start = Instant::now();
2019-10-18 13:05:28 +02:00
let update_type = UpdateType::SynonymsAddition {
number: synonyms.len(),
};
2019-10-08 17:06:56 +02:00
2019-10-18 13:05:28 +02:00
let result = apply_synonyms_addition(writer, index.main, index.synonyms, synonyms);
2019-10-08 17:06:56 +02:00
2019-10-08 17:16:48 +02:00
(update_type, result, start.elapsed())
2019-10-18 13:05:28 +02:00
}
2019-10-08 17:16:48 +02:00
Update::SynonymsDeletion(synonyms) => {
let start = Instant::now();
2019-10-18 13:05:28 +02:00
let update_type = UpdateType::SynonymsDeletion {
number: synonyms.len(),
};
2019-10-08 17:16:48 +02:00
2019-10-18 13:05:28 +02:00
let result = apply_synonyms_deletion(writer, index.main, index.synonyms, synonyms);
2019-10-08 17:16:48 +02:00
(update_type, result, start.elapsed())
2019-10-18 13:05:28 +02:00
}
};
2019-10-18 13:05:28 +02:00
debug!(
"Processed update number {} {:?} {:?}",
update_id, update_type, result
);
let detailed_duration = DetailedDuration { main: duration };
let status = UpdateResult {
update_id,
update_type,
result: result.map_err(|e| e.to_string()),
detailed_duration,
};
2019-10-18 13:05:28 +02:00
index
.updates_results
.put_update_result(writer, update_id, &status)?;
2019-10-09 17:23:48 +02:00
Ok(Some(status))
2019-10-03 15:04:11 +02:00
}