Add a test to check that merging works correctly with CSVs

This commit is contained in:
Clément Renault 2020-10-30 13:46:56 +01:00
parent 955302fd95
commit 0d01e4854b
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 73 additions and 10 deletions

View File

@ -70,7 +70,7 @@ pub fn documents_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result<Vec<
bail!("merging documents is an error ({:?})", key.as_bstr()) bail!("merging documents is an error ({:?})", key.as_bstr())
} }
pub fn merge_two_obkv(base: obkv::KvReader, update: obkv::KvReader, buffer: &mut Vec<u8>) { pub fn merge_two_obkvs(base: obkv::KvReader, update: obkv::KvReader, buffer: &mut Vec<u8>) {
use itertools::merge_join_by; use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::EitherOrBoth::{Both, Left, Right};

View File

@ -491,7 +491,7 @@ mod tests {
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
#[test] #[test]
fn simple_replacement() { fn simple_document_replacement() {
let path = tempfile::tempdir().unwrap(); let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB options.map_size(10 * 1024 * 1024); // 10 MB
@ -516,7 +516,7 @@ mod tests {
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is **always*** 3 documents. // Check that there is **always** 3 documents.
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap(); let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3); assert_eq!(count, 3);
@ -528,10 +528,73 @@ mod tests {
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is **always*** 3 documents. // Check that there is **always** 3 documents.
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap(); let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3); assert_eq!(count, 3);
drop(rtxn); drop(rtxn);
} }
#[test]
fn simple_document_merge() {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
// First we send 3 documents with duplicate ids and
// change the index method to merge documents.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is only 1 document now.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 1);
// Check that we get only one document from the database.
let docs = index.documents(&rtxn, Some(0)).unwrap();
assert_eq!(docs.len(), 1);
let (id, doc) = docs[0];
assert_eq!(id, 0);
// Check that this document is equal to the last one sent.
let mut doc_iter = doc.iter();
assert_eq!(doc_iter.next(), Some((0, &br#""1""#[..])));
assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..])));
assert_eq!(doc_iter.next(), None);
drop(rtxn);
// Second we send 1 document with id 1, to force it to be merged with the previous one.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,age\n1,25\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 1 document.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 1);
// Check that we get only one document from the database.
let docs = index.documents(&rtxn, Some(0)).unwrap();
assert_eq!(docs.len(), 1);
let (id, doc) = docs[0];
assert_eq!(id, 0);
// Check that this document is equal to the last one sent.
let mut doc_iter = doc.iter();
assert_eq!(doc_iter.next(), Some((0, &br#""1""#[..])));
assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..])));
assert_eq!(doc_iter.next(), Some((2, &br#""25""#[..])));
assert_eq!(doc_iter.next(), None);
drop(rtxn);
}
} }

View File

@ -10,7 +10,7 @@ use roaring::RoaringBitmap;
use crate::{BEU32, Index, FieldsIdsMap}; use crate::{BEU32, Index, FieldsIdsMap};
use crate::update::AvailableDocumentsIds; use crate::update::AvailableDocumentsIds;
use super::merge_function::merge_two_obkv; use super::merge_function::merge_two_obkvs;
use super::{create_writer, create_sorter, IndexDocumentsMethod}; use super::{create_writer, create_sorter, IndexDocumentsMethod};
pub struct TransformOutput { pub struct TransformOutput {
@ -58,14 +58,14 @@ impl Transform<'_, '_> {
fields_ids.push(id); fields_ids.push(id);
} }
/// The last value associated with an id is kept. /// Only the last value associated with an id is kept.
fn merge_last_win(_key: &[u8], vals: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> { fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
vals.last().context("no last value").map(|last| last.clone().into_owned()) obkvs.last().context("no last value").map(|last| last.clone().into_owned())
} }
// We initialize the sorter with the user indexing settings. // We initialize the sorter with the user indexing settings.
let mut sorter = create_sorter( let mut sorter = create_sorter(
merge_last_win, keep_latest_obkv,
self.chunk_compression_type, self.chunk_compression_type,
self.chunk_compression_level, self.chunk_compression_level,
self.chunk_fusing_shrink_size, self.chunk_fusing_shrink_size,
@ -132,7 +132,7 @@ impl Transform<'_, '_> {
let base_obkv = self.index.documents.get(&self.rtxn, &key)? let base_obkv = self.index.documents.get(&self.rtxn, &key)?
.context("document not found")?; .context("document not found")?;
let update_obkv = obkv::KvReader::new(update_obkv); let update_obkv = obkv::KvReader::new(update_obkv);
merge_two_obkv(base_obkv, update_obkv, &mut obkv_buffer); merge_two_obkvs(base_obkv, update_obkv, &mut obkv_buffer);
(docid, obkv_buffer.as_slice()) (docid, obkv_buffer.as_slice())
} }
} }