Merge pull request #78 from meilisearch/required-changes-for-transplant

Changes for transplant
This commit is contained in:
Clément Renault 2021-02-02 16:22:09 +01:00 committed by GitHub
commit d450b971f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 145 additions and 108 deletions

View File

@ -322,7 +322,7 @@ async fn main() -> anyhow::Result<()> {
// the type hint is necessary: https://github.com/rust-lang/rust/issues/32600
move |update_id, meta, content:&_| {
// We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new();
let mut update_builder = UpdateBuilder::new(update_id);
if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks {
update_builder.max_nb_chunks(max_nb_chunks);
}
@ -363,7 +363,7 @@ async fn main() -> anyhow::Result<()> {
otherwise => panic!("invalid encoding format {:?}", otherwise),
};
let result = builder.execute(reader, |indexing_step| {
let result = builder.execute(reader, |indexing_step, update_id| {
let (current, total) = match indexing_step {
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
@ -382,7 +382,7 @@ async fn main() -> anyhow::Result<()> {
});
match result {
Ok(()) => wtxn.commit().map_err(Into::into),
Ok(_) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into())
}
},
@ -430,7 +430,7 @@ async fn main() -> anyhow::Result<()> {
}
}
let result = builder.execute(|indexing_step| {
let result = builder.execute(|indexing_step, update_id| {
let (current, total) = match indexing_step {
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),

View File

@ -625,9 +625,9 @@ mod tests {
// Set the faceted fields to be the channel.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_faceted_fields(hashmap!{ "channel".into() => "string".into() });
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Test that the facet condition is correctly generated.
@ -654,9 +654,9 @@ mod tests {
// Set the faceted fields to be the channel.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_faceted_fields(hashmap!{ "timestamp".into() => "integer".into() });
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Test that the facet condition is correctly generated.
@ -682,13 +682,13 @@ mod tests {
// Set the faceted fields to be the channel.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_searchable_fields(vec!["channel".into(), "timestamp".into()]); // to keep the fields order
builder.set_faceted_fields(hashmap!{
"channel".into() => "string".into(),
"timestamp".into() => "integer".into(),
});
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Test that the facet condition is correctly generated.
@ -732,13 +732,13 @@ mod tests {
// Set the faceted fields to be the channel.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_searchable_fields(vec!["channel".into(), "timestamp".into()]); // to keep the fields order
builder.set_faceted_fields(hashmap!{
"channel".into() => "string".into(),
"timestamp".into() => "integer".into(),
});
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Test that the facet condition is correctly generated.

View File

@ -4,11 +4,17 @@ use crate::{ExternalDocumentsIds, Index};
pub struct ClearDocuments<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
_update_id: u64,
}
impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> ClearDocuments<'t, 'u, 'i> {
ClearDocuments { wtxn, index }
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64
) -> ClearDocuments<'t, 'u, 'i> {
ClearDocuments { wtxn, index, _update_id: update_id }
}
pub fn execute(self) -> anyhow::Result<usize> {

View File

@ -12,12 +12,14 @@ pub struct DeleteDocuments<'t, 'u, 'i> {
index: &'i Index,
external_documents_ids: ExternalDocumentsIds<'static>,
documents_ids: RoaringBitmap,
update_id: u64,
}
impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>>
{
let external_documents_ids = index
@ -29,6 +31,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
index,
external_documents_ids,
documents_ids: RoaringBitmap::new(),
update_id,
})
}
@ -64,7 +67,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
// We can execute a ClearDocuments operation when the number of documents
// to delete is exactly the number of documents in the database.
if current_documents_ids_len == self.documents_ids.len() {
return ClearDocuments::new(self.wtxn, self.index).execute();
return ClearDocuments::new(self.wtxn, self.index, self.update_id).execute();
}
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;

View File

@ -24,10 +24,15 @@ pub struct Facets<'t, 'u, 'i> {
pub(crate) chunk_fusing_shrink_size: Option<u64>,
level_group_size: NonZeroUsize,
min_level_size: NonZeroUsize,
_update_id: u64,
}
impl<'t, 'u, 'i> Facets<'t, 'u, 'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Facets<'t, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> Facets<'t, 'u, 'i> {
Facets {
wtxn,
index,
@ -36,6 +41,7 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> {
chunk_fusing_shrink_size: None,
level_group_size: NonZeroUsize::new(4).unwrap(),
min_level_size: NonZeroUsize::new(5).unwrap(),
_update_id: update_id,
}
}

View File

@ -12,8 +12,9 @@ use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType};
use heed::types::ByteSlice;
use log::{debug, info, error};
use memmap::Mmap;
use rayon::prelude::*;
use rayon::ThreadPool;
use rayon::prelude::*;
use serde::{Serialize, Deserialize};
use crate::index::Index;
use crate::update::{Facets, UpdateIndexingStep};
@ -32,6 +33,11 @@ mod merge_function;
mod store;
mod transform;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DocumentAdditionResult {
nb_documents: usize,
}
#[derive(Debug, Copy, Clone)]
pub enum WriteMethod {
Append,
@ -175,7 +181,7 @@ pub fn write_into_lmdb_database(
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[non_exhaustive]
pub enum IndexDocumentsMethod {
/// Replace the previous document with the new one,
@ -187,7 +193,7 @@ pub enum IndexDocumentsMethod {
UpdateDocuments,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[non_exhaustive]
pub enum UpdateFormat {
/// The given update is a real **comma seperated** CSV with headers on the first line.
@ -214,10 +220,15 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a> {
update_method: IndexDocumentsMethod,
update_format: UpdateFormat,
autogenerate_docids: bool,
update_id: u64,
}
impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i, 'a> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> IndexDocuments<'t, 'u, 'i, 'a> {
IndexDocuments {
wtxn,
index,
@ -234,6 +245,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
update_method: IndexDocumentsMethod::ReplaceDocuments,
update_format: UpdateFormat::Json,
autogenerate_docids: true,
update_id,
}
}
@ -253,12 +265,14 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.autogenerate_docids = false;
}
pub fn execute<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<()>
pub fn execute<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<DocumentAdditionResult>
where
R: io::Read,
F: Fn(UpdateIndexingStep) + Sync,
F: Fn(UpdateIndexingStep, u64) + Sync,
{
let before_transform = Instant::now();
let update_id = self.update_id;
let progress_callback = |step| progress_callback(step, update_id);
let transform = Transform {
rtxn: &self.wtxn,
@ -279,9 +293,12 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
UpdateFormat::JsonStream => transform.output_from_json_stream(reader, &progress_callback)?,
};
let nb_documents = output.documents_count;
info!("Update transformed in {:.02?}", before_transform.elapsed());
self.execute_raw(output, progress_callback)
self.execute_raw(output, progress_callback)?;
Ok(DocumentAdditionResult { nb_documents })
}
pub fn execute_raw<F>(self, output: TransformOutput, progress_callback: F) -> anyhow::Result<()>
@ -312,6 +329,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
chunk_compression_level: self.chunk_compression_level,
chunk_fusing_shrink_size: self.chunk_fusing_shrink_size,
thread_pool: self.thread_pool,
update_id: self.update_id,
};
let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?;
debug!("documents to delete {:?}", replaced_documents_ids);
@ -607,7 +625,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
});
}
let mut builder = Facets::new(self.wtxn, self.index);
let mut builder = Facets::new(self.wtxn, self.index, self.update_id);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
@ -642,9 +660,9 @@ mod tests {
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
@ -656,9 +674,9 @@ mod tests {
// Second we send 1 document with id 1, to erase the previous ones.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,updated kevin\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 3 documents.
@ -670,9 +688,9 @@ mod tests {
// Third we send 3 documents again to replace the existing ones.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,updated second kevin\n2,updated kevina\n3,updated benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 2);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 3 documents.
@ -693,10 +711,10 @@ mod tests {
// change the index method to merge documents.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is only 1 document now.
@ -720,10 +738,10 @@ mod tests {
// Second we send 1 document with id 1, to force it to be merged with the previous one.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,age\n1,25\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 1 document.
@ -756,10 +774,10 @@ mod tests {
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nkevin\nkevina\nbenoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.disable_autogenerate_docids();
builder.update_format(UpdateFormat::Csv);
assert!(builder.execute(content, |_| ()).is_err());
assert!(builder.execute(content, |_, _| ()).is_err());
wtxn.commit().unwrap();
// Check that there is no document.
@ -783,10 +801,10 @@ mod tests {
{ "name": "kevin" },
{ "name": "benoit" }
]"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.disable_autogenerate_docids();
builder.update_format(UpdateFormat::Json);
assert!(builder.execute(content, |_| ()).is_err());
assert!(builder.execute(content, |_, _| ()).is_err());
wtxn.commit().unwrap();
// Check that there is no document.
@ -806,9 +824,9 @@ mod tests {
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nkevin\nkevina\nbenoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
@ -824,9 +842,9 @@ mod tests {
// Second we send 1 document with the generated uuid, to erase the previous ones.
let mut wtxn = index.write_txn().unwrap();
let content = format!("id,name\n{},updated kevin", kevin_uuid);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv);
builder.execute(content.as_bytes(), |_| ()).unwrap();
builder.execute(content.as_bytes(), |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 3 documents.
@ -859,9 +877,9 @@ mod tests {
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
@ -873,9 +891,9 @@ mod tests {
// Second we send 1 document without specifying the id.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nnew kevin"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 4 documents now.
@ -895,9 +913,9 @@ mod tests {
// First we send 0 documents and only headers.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is no documents.
@ -921,9 +939,9 @@ mod tests {
{ "name": "kevina", "id": 21 },
{ "name": "benoit" }
]"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Json);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
@ -943,9 +961,9 @@ mod tests {
// First we send 0 documents.
let mut wtxn = index.write_txn().unwrap();
let content = &b"[]"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Json);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is no documents.
@ -969,9 +987,9 @@ mod tests {
{ "name": "kevina", "id": 21 }
{ "name": "benoit" }
"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::JsonStream);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
@ -992,18 +1010,18 @@ mod tests {
let mut wtxn = index.write_txn().unwrap();
// There is a space in the document id.
let content = &b"id,name\nbrume bleue,kevin\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
assert!(builder.execute(content, |_| ()).is_err());
assert!(builder.execute(content, |_, _| ()).is_err());
wtxn.commit().unwrap();
// First we send 1 document with a valid id.
let mut wtxn = index.write_txn().unwrap();
// There is a space in the document id.
let content = &b"id,name\n32,kevin\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 1 document now.
@ -1027,9 +1045,9 @@ mod tests {
{ "id": 1, "name": "kevina", "array": ["I", "am", "fine"] },
{ "id": 2, "name": "benoit", "array_of_object": [{ "wow": "amazing" }] }
]"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Json);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 1 documents now.

View File

@ -10,7 +10,7 @@ mod update_step;
pub use self::available_documents_ids::AvailableDocumentsIds;
pub use self::clear_documents::ClearDocuments;
pub use self::delete_documents::DeleteDocuments;
pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat};
pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
pub use self::facets::Facets;
pub use self::settings::Settings;
pub use self::update_builder::UpdateBuilder;

View File

@ -23,6 +23,7 @@ pub struct Settings<'a, 't, 'u, 'i> {
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) thread_pool: Option<&'a ThreadPool>,
update_id: u64,
// If a struct field is set to `None` it means that it hasn't been set by the user,
// however if it is `Some(None)` it means that the user forced a reset of the setting.
@ -33,7 +34,11 @@ pub struct Settings<'a, 't, 'u, 'i> {
}
impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'a, 't, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> Settings<'a, 't, 'u, 'i> {
Settings {
wtxn,
index,
@ -49,6 +54,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
displayed_fields: None,
faceted_fields: None,
criteria: None,
update_id,
}
}
@ -86,9 +92,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
fn reindex<F>(&mut self, cb: &F, old_fields_ids_map: FieldsIdsMap) -> anyhow::Result<()>
where
F: Fn(UpdateIndexingStep) + Sync,
F: Fn(UpdateIndexingStep, u64) + Sync
{
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
let update_id = self.update_id;
let cb = |step| cb(step, update_id);
// if the settings are set before any document update, we don't need to do anything, and
// will set the primary key during the first document addition.
if self.index.number_of_documents(&self.wtxn)? == 0 {
@ -118,11 +126,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
fields_ids_map.clone())?;
// We clear the full database (words-fst, documents ids and documents content).
ClearDocuments::new(self.wtxn, self.index).execute()?;
ClearDocuments::new(self.wtxn, self.index, self.update_id).execute()?;
// We index the generated `TransformOutput` which must contain
// all the documents with fields in the newly defined searchable order.
let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index);
let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index, self.update_id);
indexing_builder.log_every_n = self.log_every_n;
indexing_builder.max_nb_chunks = self.max_nb_chunks;
indexing_builder.max_memory = self.max_memory;
@ -239,7 +247,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
pub fn execute<F>(mut self, progress_callback: F) -> anyhow::Result<()>
where
F: Fn(UpdateIndexingStep) + Sync
F: Fn(UpdateIndexingStep, u64) + Sync
{
let old_fields_ids_map = self.index.fields_ids_map(&self.wtxn)?;
self.update_displayed()?;
@ -276,16 +284,16 @@ mod tests {
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// We change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 1);
builder.set_searchable_fields(vec!["name".into()]);
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that the searchable field is correctly set to "name" only.
@ -305,9 +313,9 @@ mod tests {
// We change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 2);
builder.reset_searchable_fields();
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that the searchable field have been reset and documents are found now.
@ -331,18 +339,18 @@ mod tests {
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// In the same transaction we change the displayed fields to be only the "age".
// We also change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 1);
builder.set_displayed_fields(vec!["age".into()]);
builder.set_searchable_fields(vec!["name".into()]);
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to `None` (default value).
@ -353,9 +361,9 @@ mod tests {
// We change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 2);
builder.reset_searchable_fields();
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that the displayed fields always contains only the "age" field.
@ -375,9 +383,9 @@ mod tests {
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to `None` (default value).
@ -397,14 +405,14 @@ mod tests {
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
// In the same transaction we change the displayed fields to be only the age.
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_displayed_fields(vec!["age".into()]);
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to only the "age" field.
@ -415,9 +423,9 @@ mod tests {
// We reset the fields ids to become `None`, the default value.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.reset_displayed_fields();
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to `None` (default value).
@ -436,15 +444,15 @@ mod tests {
// Set the faceted fields to be the age.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_faceted_fields(hashmap!{ "age".into() => "integer".into() });
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
// Then index some documents.
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that the displayed fields are correctly set.
@ -459,9 +467,9 @@ mod tests {
// Index a little more documents with new and current facets values.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin2,23\nkevina2,21\nbenoit2,35\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder = IndexDocuments::new(&mut wtxn, &index, 2);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap();
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();
@ -480,14 +488,14 @@ mod tests {
// Set all the settings except searchable
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_displayed_fields(vec!["hello".to_string()]);
builder.set_faceted_fields(hashmap!{
"age".into() => "integer".into(),
"toto".into() => "integer".into(),
});
builder.set_criteria(vec!["asc(toto)".to_string()]);
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
// check the output
@ -500,9 +508,9 @@ mod tests {
// We set toto and age as searchable to force reordering of the fields
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, 1);
builder.set_searchable_fields(vec!["toto".to_string(), "age".to_string()]);
builder.execute(|_| ()).unwrap();
builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();

View File

@ -13,10 +13,11 @@ pub struct UpdateBuilder<'a> {
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) thread_pool: Option<&'a ThreadPool>,
pub(crate) update_id: u64,
}
impl<'a> UpdateBuilder<'a> {
pub fn new() -> UpdateBuilder<'a> {
pub fn new(update_id: u64) -> UpdateBuilder<'a> {
UpdateBuilder {
log_every_n: None,
max_nb_chunks: None,
@ -26,6 +27,7 @@ impl<'a> UpdateBuilder<'a> {
chunk_compression_level: None,
chunk_fusing_shrink_size: None,
thread_pool: None,
update_id,
}
}
@ -67,7 +69,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index,
) -> ClearDocuments<'t, 'u, 'i>
{
ClearDocuments::new(wtxn, index)
ClearDocuments::new(wtxn, index, self.update_id)
}
pub fn delete_documents<'t, 'u, 'i>(
@ -76,7 +78,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index,
) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>>
{
DeleteDocuments::new(wtxn, index)
DeleteDocuments::new(wtxn, index, self.update_id)
}
pub fn index_documents<'t, 'u, 'i>(
@ -85,7 +87,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index,
) -> IndexDocuments<'t, 'u, 'i, 'a>
{
let mut builder = IndexDocuments::new(wtxn, index);
let mut builder = IndexDocuments::new(wtxn, index, self.update_id);
builder.log_every_n = self.log_every_n;
builder.max_nb_chunks = self.max_nb_chunks;
@ -105,7 +107,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index,
) -> Settings<'a, 't, 'u, 'i>
{
let mut builder = Settings::new(wtxn, index);
let mut builder = Settings::new(wtxn, index, self.update_id);
builder.log_every_n = self.log_every_n;
builder.max_nb_chunks = self.max_nb_chunks;
@ -125,7 +127,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index,
) -> Facets<'t, 'u, 'i>
{
let mut builder = Facets::new(wtxn, index);
let mut builder = Facets::new(wtxn, index, self.update_id);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
@ -134,9 +136,3 @@ impl<'a> UpdateBuilder<'a> {
builder
}
}
impl Default for UpdateBuilder<'_> {
fn default() -> Self {
Self::new()
}
}