Use update_id in UpdateBuilder

Add `the update_id` to the to the updates. The rationale is the
following:
- It allows for better tracability of the update events, thus improved
  debugging and logging.
- The enigne is now aware of what he's already processed, and can return
  it if asked. It may not make sense now, but in the future, the update
  store may not work the same way, and this information about the state
  of the engine will be desirable (distributed environement).
This commit is contained in:
mpostma 2020-12-22 16:21:07 +01:00
parent d487791b03
commit 3b60432687
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
8 changed files with 127 additions and 99 deletions

View File

@ -322,7 +322,7 @@ async fn main() -> anyhow::Result<()> {
// the type hint is necessary: https://github.com/rust-lang/rust/issues/32600 // the type hint is necessary: https://github.com/rust-lang/rust/issues/32600
move |update_id, meta, content:&_| { move |update_id, meta, content:&_| {
// We prepare the update by using the update builder. // We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new(); let mut update_builder = UpdateBuilder::new(update_id);
if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks { if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks {
update_builder.max_nb_chunks(max_nb_chunks); update_builder.max_nb_chunks(max_nb_chunks);
} }
@ -363,7 +363,7 @@ async fn main() -> anyhow::Result<()> {
otherwise => panic!("invalid encoding format {:?}", otherwise), otherwise => panic!("invalid encoding format {:?}", otherwise),
}; };
let result = builder.execute(reader, |indexing_step| { let result = builder.execute(reader, |indexing_step, update_id| {
let (current, total) = match indexing_step { let (current, total) = match indexing_step {
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
@ -430,7 +430,7 @@ async fn main() -> anyhow::Result<()> {
} }
} }
let result = builder.execute(|indexing_step| { let result = builder.execute(|indexing_step, update_id| {
let (current, total) = match indexing_step { let (current, total) = match indexing_step {
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),

View File

@ -625,9 +625,9 @@ mod tests {
// Set the faceted fields to be the channel. // Set the faceted fields to be the channel.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_faceted_fields(hashmap!{ "channel".into() => "string".into() }); builder.set_faceted_fields(hashmap!{ "channel".into() => "string".into() });
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Test that the facet condition is correctly generated. // Test that the facet condition is correctly generated.
@ -654,9 +654,9 @@ mod tests {
// Set the faceted fields to be the channel. // Set the faceted fields to be the channel.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_faceted_fields(hashmap!{ "timestamp".into() => "integer".into() }); builder.set_faceted_fields(hashmap!{ "timestamp".into() => "integer".into() });
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Test that the facet condition is correctly generated. // Test that the facet condition is correctly generated.
@ -682,13 +682,13 @@ mod tests {
// Set the faceted fields to be the channel. // Set the faceted fields to be the channel.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_searchable_fields(vec!["channel".into(), "timestamp".into()]); // to keep the fields order builder.set_searchable_fields(vec!["channel".into(), "timestamp".into()]); // to keep the fields order
builder.set_faceted_fields(hashmap!{ builder.set_faceted_fields(hashmap!{
"channel".into() => "string".into(), "channel".into() => "string".into(),
"timestamp".into() => "integer".into(), "timestamp".into() => "integer".into(),
}); });
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Test that the facet condition is correctly generated. // Test that the facet condition is correctly generated.

View File

@ -4,11 +4,17 @@ use crate::{ExternalDocumentsIds, Index};
pub struct ClearDocuments<'t, 'u, 'i> { pub struct ClearDocuments<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>, wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index, index: &'i Index,
_update_id: u64,
} }
impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> ClearDocuments<'t, 'u, 'i> { pub fn new(
ClearDocuments { wtxn, index } wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64
) -> ClearDocuments<'t, 'u, 'i> {
ClearDocuments { wtxn, index, _update_id: update_id }
} }
pub fn execute(self) -> anyhow::Result<usize> { pub fn execute(self) -> anyhow::Result<usize> {

View File

@ -12,12 +12,14 @@ pub struct DeleteDocuments<'t, 'u, 'i> {
index: &'i Index, index: &'i Index,
external_documents_ids: ExternalDocumentsIds<'static>, external_documents_ids: ExternalDocumentsIds<'static>,
documents_ids: RoaringBitmap, documents_ids: RoaringBitmap,
update_id: u64,
} }
impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
pub fn new( pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>, wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index, index: &'i Index,
update_id: u64,
) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>> ) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>>
{ {
let external_documents_ids = index let external_documents_ids = index
@ -29,6 +31,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
index, index,
external_documents_ids, external_documents_ids,
documents_ids: RoaringBitmap::new(), documents_ids: RoaringBitmap::new(),
update_id,
}) })
} }
@ -64,7 +67,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
// We can execute a ClearDocuments operation when the number of documents // We can execute a ClearDocuments operation when the number of documents
// to delete is exactly the number of documents in the database. // to delete is exactly the number of documents in the database.
if current_documents_ids_len == self.documents_ids.len() { if current_documents_ids_len == self.documents_ids.len() {
return ClearDocuments::new(self.wtxn, self.index).execute(); return ClearDocuments::new(self.wtxn, self.index, self.update_id).execute();
} }
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;

View File

@ -24,10 +24,15 @@ pub struct Facets<'t, 'u, 'i> {
pub(crate) chunk_fusing_shrink_size: Option<u64>, pub(crate) chunk_fusing_shrink_size: Option<u64>,
level_group_size: NonZeroUsize, level_group_size: NonZeroUsize,
min_level_size: NonZeroUsize, min_level_size: NonZeroUsize,
_update_id: u64,
} }
impl<'t, 'u, 'i> Facets<'t, 'u, 'i> { impl<'t, 'u, 'i> Facets<'t, 'u, 'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Facets<'t, 'u, 'i> { pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> Facets<'t, 'u, 'i> {
Facets { Facets {
wtxn, wtxn,
index, index,
@ -36,6 +41,7 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> {
chunk_fusing_shrink_size: None, chunk_fusing_shrink_size: None,
level_group_size: NonZeroUsize::new(4).unwrap(), level_group_size: NonZeroUsize::new(4).unwrap(),
min_level_size: NonZeroUsize::new(5).unwrap(), min_level_size: NonZeroUsize::new(5).unwrap(),
_update_id: update_id,
} }
} }

View File

@ -220,10 +220,15 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a> {
update_method: IndexDocumentsMethod, update_method: IndexDocumentsMethod,
update_format: UpdateFormat, update_format: UpdateFormat,
autogenerate_docids: bool, autogenerate_docids: bool,
update_id: u64,
} }
impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i, 'a> { pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> IndexDocuments<'t, 'u, 'i, 'a> {
IndexDocuments { IndexDocuments {
wtxn, wtxn,
index, index,
@ -240,6 +245,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
update_method: IndexDocumentsMethod::ReplaceDocuments, update_method: IndexDocumentsMethod::ReplaceDocuments,
update_format: UpdateFormat::Json, update_format: UpdateFormat::Json,
autogenerate_docids: true, autogenerate_docids: true,
update_id,
} }
} }
@ -262,9 +268,11 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
pub fn execute<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<DocumentAdditionResult> pub fn execute<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<DocumentAdditionResult>
where where
R: io::Read, R: io::Read,
F: Fn(UpdateIndexingStep) + Sync, F: Fn(UpdateIndexingStep, u64) + Sync,
{ {
let before_transform = Instant::now(); let before_transform = Instant::now();
let update_id = self.update_id;
let progress_callback = |step| progress_callback(step, update_id);
let transform = Transform { let transform = Transform {
rtxn: &self.wtxn, rtxn: &self.wtxn,
@ -321,6 +329,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
chunk_compression_level: self.chunk_compression_level, chunk_compression_level: self.chunk_compression_level,
chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, chunk_fusing_shrink_size: self.chunk_fusing_shrink_size,
thread_pool: self.thread_pool, thread_pool: self.thread_pool,
update_id: self.update_id,
}; };
let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?; let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?;
debug!("documents to delete {:?}", replaced_documents_ids); debug!("documents to delete {:?}", replaced_documents_ids);
@ -616,7 +625,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
}); });
} }
let mut builder = Facets::new(self.wtxn, self.index); let mut builder = Facets::new(self.wtxn, self.index, self.update_id);
builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level; builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
@ -651,9 +660,9 @@ mod tests {
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 3 documents now. // Check that there is 3 documents now.
@ -665,9 +674,9 @@ mod tests {
// Second we send 1 document with id 1, to erase the previous ones. // Second we send 1 document with id 1, to erase the previous ones.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,updated kevin\n"[..]; let content = &b"id,name\n1,updated kevin\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is **always** 3 documents. // Check that there is **always** 3 documents.
@ -679,9 +688,9 @@ mod tests {
// Third we send 3 documents again to replace the existing ones. // Third we send 3 documents again to replace the existing ones.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,updated second kevin\n2,updated kevina\n3,updated benoit\n"[..]; let content = &b"id,name\n1,updated second kevin\n2,updated kevina\n3,updated benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 2);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is **always** 3 documents. // Check that there is **always** 3 documents.
@ -702,10 +711,10 @@ mod tests {
// change the index method to merge documents. // change the index method to merge documents.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..]; let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is only 1 document now. // Check that there is only 1 document now.
@ -729,10 +738,10 @@ mod tests {
// Second we send 1 document with id 1, to force it to be merged with the previous one. // Second we send 1 document with id 1, to force it to be merged with the previous one.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,age\n1,25\n"[..]; let content = &b"id,age\n1,25\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is **always** 1 document. // Check that there is **always** 1 document.
@ -765,10 +774,10 @@ mod tests {
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nkevin\nkevina\nbenoit\n"[..]; let content = &b"name\nkevin\nkevina\nbenoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.disable_autogenerate_docids(); builder.disable_autogenerate_docids();
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
assert!(builder.execute(content, |_| ()).is_err()); assert!(builder.execute(content, |_, _| ()).is_err());
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is no document. // Check that there is no document.
@ -792,10 +801,10 @@ mod tests {
{ "name": "kevin" }, { "name": "kevin" },
{ "name": "benoit" } { "name": "benoit" }
]"#[..]; ]"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.disable_autogenerate_docids(); builder.disable_autogenerate_docids();
builder.update_format(UpdateFormat::Json); builder.update_format(UpdateFormat::Json);
assert!(builder.execute(content, |_| ()).is_err()); assert!(builder.execute(content, |_, _| ()).is_err());
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is no document. // Check that there is no document.
@ -815,9 +824,9 @@ mod tests {
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nkevin\nkevina\nbenoit\n"[..]; let content = &b"name\nkevin\nkevina\nbenoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 3 documents now. // Check that there is 3 documents now.
@ -833,9 +842,9 @@ mod tests {
// Second we send 1 document with the generated uuid, to erase the previous ones. // Second we send 1 document with the generated uuid, to erase the previous ones.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = format!("id,name\n{},updated kevin", kevin_uuid); let content = format!("id,name\n{},updated kevin", kevin_uuid);
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content.as_bytes(), |_| ()).unwrap(); builder.execute(content.as_bytes(), |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is **always** 3 documents. // Check that there is **always** 3 documents.
@ -868,9 +877,9 @@ mod tests {
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 3 documents now. // Check that there is 3 documents now.
@ -882,9 +891,9 @@ mod tests {
// Second we send 1 document without specifying the id. // Second we send 1 document without specifying the id.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nnew kevin"[..]; let content = &b"name\nnew kevin"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 4 documents now. // Check that there is 4 documents now.
@ -904,9 +913,9 @@ mod tests {
// First we send 0 documents and only headers. // First we send 0 documents and only headers.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n"[..]; let content = &b"id,name\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is no documents. // Check that there is no documents.
@ -930,9 +939,9 @@ mod tests {
{ "name": "kevina", "id": 21 }, { "name": "kevina", "id": 21 },
{ "name": "benoit" } { "name": "benoit" }
]"#[..]; ]"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Json); builder.update_format(UpdateFormat::Json);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 3 documents now. // Check that there is 3 documents now.
@ -952,9 +961,9 @@ mod tests {
// First we send 0 documents. // First we send 0 documents.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"[]"[..]; let content = &b"[]"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Json); builder.update_format(UpdateFormat::Json);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is no documents. // Check that there is no documents.
@ -978,9 +987,9 @@ mod tests {
{ "name": "kevina", "id": 21 } { "name": "kevina", "id": 21 }
{ "name": "benoit" } { "name": "benoit" }
"#[..]; "#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::JsonStream); builder.update_format(UpdateFormat::JsonStream);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 3 documents now. // Check that there is 3 documents now.
@ -1001,18 +1010,18 @@ mod tests {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
// There is a space in the document id. // There is a space in the document id.
let content = &b"id,name\nbrume bleue,kevin\n"[..]; let content = &b"id,name\nbrume bleue,kevin\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
assert!(builder.execute(content, |_| ()).is_err()); assert!(builder.execute(content, |_, _| ()).is_err());
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// First we send 1 document with a valid id. // First we send 1 document with a valid id.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
// There is a space in the document id. // There is a space in the document id.
let content = &b"id,name\n32,kevin\n"[..]; let content = &b"id,name\n32,kevin\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 1 document now. // Check that there is 1 document now.
@ -1036,9 +1045,9 @@ mod tests {
{ "id": 1, "name": "kevina", "array": ["I", "am", "fine"] }, { "id": 1, "name": "kevina", "array": ["I", "am", "fine"] },
{ "id": 2, "name": "benoit", "array_of_object": [{ "wow": "amazing" }] } { "id": 2, "name": "benoit", "array_of_object": [{ "wow": "amazing" }] }
]"#[..]; ]"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Json); builder.update_format(UpdateFormat::Json);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 1 documents now. // Check that there is 1 documents now.

View File

@ -23,6 +23,7 @@ pub struct Settings<'a, 't, 'u, 'i> {
pub(crate) chunk_compression_level: Option<u32>, pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>, pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) thread_pool: Option<&'a ThreadPool>, pub(crate) thread_pool: Option<&'a ThreadPool>,
update_id: u64,
// If a struct field is set to `None` it means that it hasn't been set by the user, // If a struct field is set to `None` it means that it hasn't been set by the user,
// however if it is `Some(None)` it means that the user forced a reset of the setting. // however if it is `Some(None)` it means that the user forced a reset of the setting.
@ -33,7 +34,11 @@ pub struct Settings<'a, 't, 'u, 'i> {
} }
impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'a, 't, 'u, 'i> { pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> Settings<'a, 't, 'u, 'i> {
Settings { Settings {
wtxn, wtxn,
index, index,
@ -49,6 +54,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
displayed_fields: None, displayed_fields: None,
faceted_fields: None, faceted_fields: None,
criteria: None, criteria: None,
update_id,
} }
} }
@ -86,9 +92,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
fn reindex<F>(&mut self, cb: &F, old_fields_ids_map: FieldsIdsMap) -> anyhow::Result<()> fn reindex<F>(&mut self, cb: &F, old_fields_ids_map: FieldsIdsMap) -> anyhow::Result<()>
where where
F: Fn(UpdateIndexingStep) + Sync, F: Fn(UpdateIndexingStep, u64) + Sync
{ {
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
let update_id = self.update_id;
let cb = |step| cb(step, update_id);
// if the settings are set before any document update, we don't need to do anything, and // if the settings are set before any document update, we don't need to do anything, and
// will set the primary key during the first document addition. // will set the primary key during the first document addition.
if self.index.number_of_documents(&self.wtxn)? == 0 { if self.index.number_of_documents(&self.wtxn)? == 0 {
@ -118,11 +126,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
fields_ids_map.clone())?; fields_ids_map.clone())?;
// We clear the full database (words-fst, documents ids and documents content). // We clear the full database (words-fst, documents ids and documents content).
ClearDocuments::new(self.wtxn, self.index).execute()?; ClearDocuments::new(self.wtxn, self.index, self.update_id).execute()?;
// We index the generated `TransformOutput` which must contain // We index the generated `TransformOutput` which must contain
// all the documents with fields in the newly defined searchable order. // all the documents with fields in the newly defined searchable order.
let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index); let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index, self.update_id);
indexing_builder.log_every_n = self.log_every_n; indexing_builder.log_every_n = self.log_every_n;
indexing_builder.max_nb_chunks = self.max_nb_chunks; indexing_builder.max_nb_chunks = self.max_nb_chunks;
indexing_builder.max_memory = self.max_memory; indexing_builder.max_memory = self.max_memory;
@ -239,7 +247,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
pub fn execute<F>(mut self, progress_callback: F) -> anyhow::Result<()> pub fn execute<F>(mut self, progress_callback: F) -> anyhow::Result<()>
where where
F: Fn(UpdateIndexingStep) + Sync F: Fn(UpdateIndexingStep, u64) + Sync
{ {
let old_fields_ids_map = self.index.fields_ids_map(&self.wtxn)?; let old_fields_ids_map = self.index.fields_ids_map(&self.wtxn)?;
self.update_displayed()?; self.update_displayed()?;
@ -276,16 +284,16 @@ mod tests {
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// We change the searchable fields to be the "name" field only. // We change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 1);
builder.set_searchable_fields(vec!["name".into()]); builder.set_searchable_fields(vec!["name".into()]);
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the searchable field is correctly set to "name" only. // Check that the searchable field is correctly set to "name" only.
@ -305,9 +313,9 @@ mod tests {
// We change the searchable fields to be the "name" field only. // We change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 2);
builder.reset_searchable_fields(); builder.reset_searchable_fields();
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the searchable field have been reset and documents are found now. // Check that the searchable field have been reset and documents are found now.
@ -331,18 +339,18 @@ mod tests {
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// In the same transaction we change the displayed fields to be only the "age". // In the same transaction we change the displayed fields to be only the "age".
// We also change the searchable fields to be the "name" field only. // We also change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 1);
builder.set_displayed_fields(vec!["age".into()]); builder.set_displayed_fields(vec!["age".into()]);
builder.set_searchable_fields(vec!["name".into()]); builder.set_searchable_fields(vec!["name".into()]);
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to `None` (default value). // Check that the displayed fields are correctly set to `None` (default value).
@ -353,9 +361,9 @@ mod tests {
// We change the searchable fields to be the "name" field only. // We change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 2);
builder.reset_searchable_fields(); builder.reset_searchable_fields();
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the displayed fields always contains only the "age" field. // Check that the displayed fields always contains only the "age" field.
@ -375,9 +383,9 @@ mod tests {
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to `None` (default value). // Check that the displayed fields are correctly set to `None` (default value).
@ -397,14 +405,14 @@ mod tests {
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
// In the same transaction we change the displayed fields to be only the age. // In the same transaction we change the displayed fields to be only the age.
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_displayed_fields(vec!["age".into()]); builder.set_displayed_fields(vec!["age".into()]);
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to only the "age" field. // Check that the displayed fields are correctly set to only the "age" field.
@ -415,9 +423,9 @@ mod tests {
// We reset the fields ids to become `None`, the default value. // We reset the fields ids to become `None`, the default value.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.reset_displayed_fields(); builder.reset_displayed_fields();
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to `None` (default value). // Check that the displayed fields are correctly set to `None` (default value).
@ -436,15 +444,15 @@ mod tests {
// Set the faceted fields to be the age. // Set the faceted fields to be the age.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_faceted_fields(hashmap!{ "age".into() => "integer".into() }); builder.set_faceted_fields(hashmap!{ "age".into() => "integer".into() });
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
// Then index some documents. // Then index some documents.
let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 1);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that the displayed fields are correctly set. // Check that the displayed fields are correctly set.
@ -459,9 +467,9 @@ mod tests {
// Index a little more documents with new and current facets values. // Index a little more documents with new and current facets values.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name,age\nkevin2,23\nkevina2,21\nbenoit2,35\n"[..]; let content = &b"name,age\nkevin2,23\nkevina2,21\nbenoit2,35\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index, 2);
builder.update_format(UpdateFormat::Csv); builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
@ -480,14 +488,14 @@ mod tests {
// Set all the settings except searchable // Set all the settings except searchable
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 0);
builder.set_displayed_fields(vec!["hello".to_string()]); builder.set_displayed_fields(vec!["hello".to_string()]);
builder.set_faceted_fields(hashmap!{ builder.set_faceted_fields(hashmap!{
"age".into() => "integer".into(), "age".into() => "integer".into(),
"toto".into() => "integer".into(), "toto".into() => "integer".into(),
}); });
builder.set_criteria(vec!["asc(toto)".to_string()]); builder.set_criteria(vec!["asc(toto)".to_string()]);
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// check the output // check the output
@ -500,9 +508,9 @@ mod tests {
// We set toto and age as searchable to force reordering of the fields // We set toto and age as searchable to force reordering of the fields
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index); let mut builder = Settings::new(&mut wtxn, &index, 1);
builder.set_searchable_fields(vec!["toto".to_string(), "age".to_string()]); builder.set_searchable_fields(vec!["toto".to_string(), "age".to_string()]);
builder.execute(|_| ()).unwrap(); builder.execute(|_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();

View File

@ -13,10 +13,11 @@ pub struct UpdateBuilder<'a> {
pub(crate) chunk_compression_level: Option<u32>, pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>, pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) thread_pool: Option<&'a ThreadPool>, pub(crate) thread_pool: Option<&'a ThreadPool>,
pub(crate) update_id: u64,
} }
impl<'a> UpdateBuilder<'a> { impl<'a> UpdateBuilder<'a> {
pub fn new() -> UpdateBuilder<'a> { pub fn new(update_id: u64) -> UpdateBuilder<'a> {
UpdateBuilder { UpdateBuilder {
log_every_n: None, log_every_n: None,
max_nb_chunks: None, max_nb_chunks: None,
@ -26,6 +27,7 @@ impl<'a> UpdateBuilder<'a> {
chunk_compression_level: None, chunk_compression_level: None,
chunk_fusing_shrink_size: None, chunk_fusing_shrink_size: None,
thread_pool: None, thread_pool: None,
update_id,
} }
} }
@ -67,7 +69,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index, index: &'i Index,
) -> ClearDocuments<'t, 'u, 'i> ) -> ClearDocuments<'t, 'u, 'i>
{ {
ClearDocuments::new(wtxn, index) ClearDocuments::new(wtxn, index, self.update_id)
} }
pub fn delete_documents<'t, 'u, 'i>( pub fn delete_documents<'t, 'u, 'i>(
@ -76,7 +78,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index, index: &'i Index,
) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>> ) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>>
{ {
DeleteDocuments::new(wtxn, index) DeleteDocuments::new(wtxn, index, self.update_id)
} }
pub fn index_documents<'t, 'u, 'i>( pub fn index_documents<'t, 'u, 'i>(
@ -85,7 +87,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index, index: &'i Index,
) -> IndexDocuments<'t, 'u, 'i, 'a> ) -> IndexDocuments<'t, 'u, 'i, 'a>
{ {
let mut builder = IndexDocuments::new(wtxn, index); let mut builder = IndexDocuments::new(wtxn, index, self.update_id);
builder.log_every_n = self.log_every_n; builder.log_every_n = self.log_every_n;
builder.max_nb_chunks = self.max_nb_chunks; builder.max_nb_chunks = self.max_nb_chunks;
@ -105,7 +107,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index, index: &'i Index,
) -> Settings<'a, 't, 'u, 'i> ) -> Settings<'a, 't, 'u, 'i>
{ {
let mut builder = Settings::new(wtxn, index); let mut builder = Settings::new(wtxn, index, self.update_id);
builder.log_every_n = self.log_every_n; builder.log_every_n = self.log_every_n;
builder.max_nb_chunks = self.max_nb_chunks; builder.max_nb_chunks = self.max_nb_chunks;
@ -125,7 +127,7 @@ impl<'a> UpdateBuilder<'a> {
index: &'i Index, index: &'i Index,
) -> Facets<'t, 'u, 'i> ) -> Facets<'t, 'u, 'i>
{ {
let mut builder = Facets::new(wtxn, index); let mut builder = Facets::new(wtxn, index, self.update_id);
builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level; builder.chunk_compression_level = self.chunk_compression_level;
@ -134,9 +136,3 @@ impl<'a> UpdateBuilder<'a> {
builder builder
} }
} }
impl Default for UpdateBuilder<'_> {
fn default() -> Self {
Self::new()
}
}