From 1e1821f0025fafe2b34e02056d6ac835e3ab466a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 26 Oct 2020 10:55:07 +0100 Subject: [PATCH] Introduce the merge_two_obkv function to merge documents on update --- src/indexing/transform.rs | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/src/indexing/transform.rs b/src/indexing/transform.rs index 9e5b1ae6c..901b12dbd 100644 --- a/src/indexing/transform.rs +++ b/src/indexing/transform.rs @@ -4,11 +4,12 @@ use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use anyhow::Context; -use crate::{FieldsIdsMap, AvailableDocumentsIds}; use fst::{IntoStreamer, Streamer}; use grenad::CompressionType; use roaring::RoaringBitmap; +use crate::FieldsIdsMap; +use crate::update::AvailableDocumentsIds; use super::{create_writer, create_sorter}; pub struct TransformOutput { @@ -31,6 +32,22 @@ pub struct Transform { pub max_memory: Option, } +fn merge_two_obkv(base: obkv::KvReader, update: obkv::KvReader, buffer: &mut Vec) { + use itertools::merge_join_by; + use itertools::EitherOrBoth::{Both, Left, Right}; + + buffer.clear(); + + let mut writer = obkv::KvWriter::new(buffer); + for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) { + match eob { + Both(_, (k, v)) | Left((k, v)) | Right((k, v)) => writer.insert(k, v).unwrap(), + } + } + + writer.finish().unwrap(); +} + impl> Transform { /// Extract the users ids, deduplicate and compute the new internal documents ids /// and fields ids, writing all the documents under their internal ids into a final file. @@ -101,15 +118,26 @@ impl> Transform { // While we write into final file we get or generate the internal documents ids. let mut documents_count = 0; let mut iter = sorter.into_iter()?; - while let Some((user_id, obkv)) = iter.next()? { + while let Some((user_id, update_obkv)) = iter.next()? { - let docid = match self.users_ids_documents_ids.get(user_id) { + let (docid, obkv) = match self.users_ids_documents_ids.get(user_id) { Some(docid) => { // If we find the user id in the current users ids documents ids map // we use it and insert it in the list of replaced documents. let docid = u32::try_from(docid).expect("valid document id"); replaced_documents_ids.insert(docid); - docid + + // Depending on the update indexing method we will merge + // the document update with the current document or not. + let must_merge_documents = false; + if must_merge_documents { + let base_obkv = todo!(); + let update_obkv = obkv::KvReader::new(update_obkv); + merge_two_obkv(base_obkv, update_obkv, &mut obkv_buffer); + (docid, obkv_buffer.as_slice()) + } else { + (docid, update_obkv) + } }, None => { // If this user id is new we add it to the users ids documents ids map @@ -118,7 +146,7 @@ impl> Transform { .context("no more available documents ids")?; new_users_ids_documents_ids_builder.insert(user_id, new_docid as u64)?; new_documents_ids.insert(new_docid); - new_docid + (new_docid, update_obkv) }, };