Move the push update functions to their related modules

This commit is contained in:
Clément Renault 2019-10-08 17:24:11 +02:00
parent 6ee0d72c7b
commit 0a5ad4db06
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
6 changed files with 94 additions and 106 deletions

View File

@ -8,7 +8,7 @@ use serde::Serialize;
use crate::raw_indexer::RawIndexer; use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
use crate::store; use crate::store;
use crate::update::{push_documents_addition, apply_documents_deletion}; use crate::update::{Update, next_update_id, apply_documents_deletion};
use crate::{MResult, Error, RankedMap}; use crate::{MResult, Error, RankedMap};
pub struct DocumentsAddition<D> { pub struct DocumentsAddition<D> {
@ -59,6 +59,28 @@ impl<D> Extend<D> for DocumentsAddition<D> {
} }
} }
pub fn push_documents_addition<D: serde::Serialize>(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: Vec<D>,
) -> MResult<u64>
{
let mut values = Vec::with_capacity(addition.len());
for add in addition {
let vec = rmp_serde::to_vec_named(&add)?;
let add = rmp_serde::from_read(&vec[..])?;
values.push(add);
}
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::DocumentsAddition(values);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn apply_documents_addition( pub fn apply_documents_addition(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
main_store: store::Main, main_store: store::Main,

View File

@ -6,7 +6,7 @@ use sdset::{SetBuf, SetOperation, duo::DifferenceByKey};
use crate::{DocumentId, RankedMap, MResult, Error}; use crate::{DocumentId, RankedMap, MResult, Error};
use crate::serde::extract_document_id; use crate::serde::extract_document_id;
use crate::update::push_documents_deletion; use crate::update::{Update, next_update_id};
use crate::store; use crate::store;
pub struct DocumentsDeletion { pub struct DocumentsDeletion {
@ -69,6 +69,21 @@ impl Extend<DocumentId> for DocumentsDeletion {
} }
} }
pub fn push_documents_deletion(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
deletion: Vec<DocumentId>,
) -> MResult<u64>
{
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::DocumentsDeletion(deletion);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn apply_documents_deletion( pub fn apply_documents_deletion(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
main_store: store::Main, main_store: store::Main,

View File

@ -6,12 +6,13 @@ mod synonyms_deletion;
pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_addition::{DocumentsAddition, apply_documents_addition};
pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion};
pub use self::schema_update::apply_schema_update; pub use self::schema_update::{apply_schema_update, push_schema_update};
pub use self::synonyms_addition::{SynonymsAddition, apply_synonyms_addition}; pub use self::synonyms_addition::{SynonymsAddition, apply_synonyms_addition};
pub use self::synonyms_deletion::{SynonymsDeletion, apply_synonyms_deletion}; pub use self::synonyms_deletion::{SynonymsDeletion, apply_synonyms_deletion};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::cmp;
use log::debug; use log::debug;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
@ -77,11 +78,11 @@ pub fn update_status<T: rkv::Readable>(
} }
} }
fn biggest_update_id( pub fn next_update_id(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
) -> MResult<Option<u64>> ) -> MResult<u64>
{ {
let last_update_id = updates_store.last_update_id(writer)?; let last_update_id = updates_store.last_update_id(writer)?;
let last_update_id = last_update_id.map(|(n, _)| n); let last_update_id = last_update_id.map(|(n, _)| n);
@ -89,106 +90,10 @@ fn biggest_update_id(
let last_update_results_id = updates_results_store.last_update_id(writer)?; let last_update_results_id = updates_results_store.last_update_id(writer)?;
let last_update_results_id = last_update_results_id.map(|(n, _)| n); let last_update_results_id = last_update_results_id.map(|(n, _)| n);
let max = last_update_id.max(last_update_results_id); let max_update_id = cmp::max(last_update_id, last_update_results_id);
let new_update_id = max_update_id.map_or(0, |n| n + 1);
Ok(max) Ok(new_update_id)
}
pub fn next_update_id(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
) -> MResult<u64>
{
let last_update_id = biggest_update_id(
writer,
updates_store,
updates_results_store
)?;
Ok(last_update_id.map_or(0, |n| n + 1))
}
pub fn push_schema_update(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
schema: Schema,
) -> MResult<u64>
{
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::SchemaUpdate(schema);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn push_documents_addition<D: serde::Serialize>(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: Vec<D>,
) -> MResult<u64>
{
let mut values = Vec::with_capacity(addition.len());
for add in addition {
let vec = rmp_serde::to_vec_named(&add)?;
let add = rmp_serde::from_read(&vec[..])?;
values.push(add);
}
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::DocumentsAddition(values);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn push_documents_deletion(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
deletion: Vec<DocumentId>,
) -> MResult<u64>
{
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::DocumentsDeletion(deletion);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn push_synonyms_addition(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: BTreeMap<String, Vec<String>>,
) -> MResult<u64>
{
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::SynonymsAddition(addition);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn push_synonyms_deletion(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
deletion: BTreeMap<String, Option<Vec<String>>>,
) -> MResult<u64>
{
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::SynonymsDeletion(deletion);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
} }
pub fn update_task( pub fn update_task(

View File

@ -1,5 +1,6 @@
use meilidb_schema::Schema; use meilidb_schema::Schema;
use crate::{store, error::UnsupportedOperation, MResult}; use crate::{store, error::UnsupportedOperation, MResult};
use crate::update::{Update, next_update_id};
pub fn apply_schema_update( pub fn apply_schema_update(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
@ -13,3 +14,18 @@ pub fn apply_schema_update(
main_store.put_schema(writer, new_schema) main_store.put_schema(writer, new_schema)
} }
pub fn push_schema_update(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
schema: Schema,
) -> MResult<u64>
{
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::SchemaUpdate(schema);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}

View File

@ -8,7 +8,7 @@ use crate::automaton::normalize_str;
use crate::raw_indexer::RawIndexer; use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
use crate::store; use crate::store;
use crate::update::push_synonyms_addition; use crate::update::{Update, next_update_id};
use crate::{MResult, Error, RankedMap}; use crate::{MResult, Error, RankedMap};
pub struct SynonymsAddition { pub struct SynonymsAddition {
@ -57,6 +57,21 @@ impl SynonymsAddition {
} }
} }
pub fn push_synonyms_addition(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: BTreeMap<String, Vec<String>>,
) -> MResult<u64>
{
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::SynonymsAddition(addition);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn apply_synonyms_addition( pub fn apply_synonyms_addition(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
main_store: store::Main, main_store: store::Main,

View File

@ -9,7 +9,7 @@ use crate::automaton::normalize_str;
use crate::raw_indexer::RawIndexer; use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
use crate::store; use crate::store;
use crate::update::push_synonyms_deletion; use crate::update::{Update, next_update_id};
use crate::{MResult, Error, RankedMap}; use crate::{MResult, Error, RankedMap};
pub struct SynonymsDeletion { pub struct SynonymsDeletion {
@ -67,6 +67,21 @@ impl SynonymsDeletion {
} }
} }
pub fn push_synonyms_deletion(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
deletion: BTreeMap<String, Option<Vec<String>>>,
) -> MResult<u64>
{
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::SynonymsDeletion(deletion);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn apply_synonyms_deletion( pub fn apply_synonyms_deletion(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
main_store: store::Main, main_store: store::Main,