Introduce the merge_two_obkv function to merge documents on update

This commit is contained in:
Clément Renault 2020-10-26 10:55:07 +01:00
parent 60347a5483
commit 1e1821f002
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -4,11 +4,12 @@ use std::fs::File;
use std::io::{Read, Seek, SeekFrom}; use std::io::{Read, Seek, SeekFrom};
use anyhow::Context; use anyhow::Context;
use crate::{FieldsIdsMap, AvailableDocumentsIds};
use fst::{IntoStreamer, Streamer}; use fst::{IntoStreamer, Streamer};
use grenad::CompressionType; use grenad::CompressionType;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use crate::FieldsIdsMap;
use crate::update::AvailableDocumentsIds;
use super::{create_writer, create_sorter}; use super::{create_writer, create_sorter};
pub struct TransformOutput { pub struct TransformOutput {
@ -31,6 +32,22 @@ pub struct Transform<A> {
pub max_memory: Option<usize>, pub max_memory: Option<usize>,
} }
fn merge_two_obkv(base: obkv::KvReader, update: obkv::KvReader, buffer: &mut Vec<u8>) {
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
buffer.clear();
let mut writer = obkv::KvWriter::new(buffer);
for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) {
match eob {
Both(_, (k, v)) | Left((k, v)) | Right((k, v)) => writer.insert(k, v).unwrap(),
}
}
writer.finish().unwrap();
}
impl<A: AsRef<[u8]>> Transform<A> { impl<A: AsRef<[u8]>> Transform<A> {
/// Extract the users ids, deduplicate and compute the new internal documents ids /// Extract the users ids, deduplicate and compute the new internal documents ids
/// and fields ids, writing all the documents under their internal ids into a final file. /// and fields ids, writing all the documents under their internal ids into a final file.
@ -101,15 +118,26 @@ impl<A: AsRef<[u8]>> Transform<A> {
// While we write into final file we get or generate the internal documents ids. // While we write into final file we get or generate the internal documents ids.
let mut documents_count = 0; let mut documents_count = 0;
let mut iter = sorter.into_iter()?; let mut iter = sorter.into_iter()?;
while let Some((user_id, obkv)) = iter.next()? { while let Some((user_id, update_obkv)) = iter.next()? {
let docid = match self.users_ids_documents_ids.get(user_id) { let (docid, obkv) = match self.users_ids_documents_ids.get(user_id) {
Some(docid) => { Some(docid) => {
// If we find the user id in the current users ids documents ids map // If we find the user id in the current users ids documents ids map
// we use it and insert it in the list of replaced documents. // we use it and insert it in the list of replaced documents.
let docid = u32::try_from(docid).expect("valid document id"); let docid = u32::try_from(docid).expect("valid document id");
replaced_documents_ids.insert(docid); replaced_documents_ids.insert(docid);
docid
// Depending on the update indexing method we will merge
// the document update with the current document or not.
let must_merge_documents = false;
if must_merge_documents {
let base_obkv = todo!();
let update_obkv = obkv::KvReader::new(update_obkv);
merge_two_obkv(base_obkv, update_obkv, &mut obkv_buffer);
(docid, obkv_buffer.as_slice())
} else {
(docid, update_obkv)
}
}, },
None => { None => {
// If this user id is new we add it to the users ids documents ids map // If this user id is new we add it to the users ids documents ids map
@ -118,7 +146,7 @@ impl<A: AsRef<[u8]>> Transform<A> {
.context("no more available documents ids")?; .context("no more available documents ids")?;
new_users_ids_documents_ids_builder.insert(user_id, new_docid as u64)?; new_users_ids_documents_ids_builder.insert(user_id, new_docid as u64)?;
new_documents_ids.insert(new_docid); new_documents_ids.insert(new_docid);
new_docid (new_docid, update_obkv)
}, },
}; };