Fix transform reindexing path

This commit is contained in:
ManyTheFish 2023-11-02 13:37:54 +01:00
parent 1b4ff991c0
commit bc51d6157a
2 changed files with 56 additions and 28 deletions

View File

@ -18,7 +18,7 @@ pub struct DocumentOperation {
pub kind: DocumentOperationKind,
}
pub struct ExternalDocumentsIds(Database<Str, OwnedType<BEU32>>);
pub struct ExternalDocumentsIds(pub Database<Str, OwnedType<BEU32>>);
impl ExternalDocumentsIds {
pub fn new(db: Database<Str, OwnedType<BEU32>>) -> ExternalDocumentsIds {

View File

@ -14,14 +14,15 @@ use serde_json::Value;
use smartstring::SmartString;
use super::helpers::{
create_sorter, create_writer, obkvs_keep_last_addition_merge_deletions,
obkvs_merge_additions_and_deletions, MergeFn,
create_sorter, create_writer, keep_first, obkvs_keep_last_addition_merge_deletions,
obkvs_merge_additions_and_deletions, sorter_into_reader, MergeFn,
};
use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
use crate::index::{db_name, main_key};
use crate::update::del_add::{into_del_add_obkv, DelAdd, KvReaderDelAdd};
use crate::update::index_documents::GrenadParameters;
use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep};
use crate::{
FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32,
@ -772,24 +773,35 @@ impl<'a, 'i> Transform<'a, 'i> {
let documents_ids = self.index.documents_ids(wtxn)?;
let documents_count = documents_ids.len() as usize;
// We create a final writer to write the new documents in order from the sorter.
let mut original_writer = create_writer(
// We initialize the sorter with the user indexing settings.
let mut original_sorter = create_sorter(
grenad::SortAlgorithm::Stable,
keep_first,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
tempfile::tempfile()?,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
);
// We create a final writer to write the new documents in order from the sorter.
let mut flattened_writer = create_writer(
// We initialize the sorter with the user indexing settings.
let mut flattened_sorter = create_sorter(
grenad::SortAlgorithm::Stable,
keep_first,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
tempfile::tempfile()?,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
);
let mut obkv_buffer = Vec::new();
let mut document_sorter_buffer = Vec::new();
for result in self.index.all_documents(wtxn)? {
let (docid, obkv) = result?;
let mut document_sorter_key_buffer = Vec::new();
let mut document_sorter_value_buffer = Vec::new();
for result in self.index.external_documents_ids().0.iter(wtxn)? {
let (external_id, docid) = result?;
let obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
let docid = docid.get();
obkv_buffer.clear();
let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer);
@ -802,9 +814,18 @@ impl<'a, 'i> Transform<'a, 'i> {
}
let buffer = obkv_writer.into_inner()?;
document_sorter_buffer.clear();
into_del_add_obkv(KvReaderU16::new(buffer), false, true, &mut document_sorter_buffer)?;
original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
document_sorter_key_buffer.clear();
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
document_sorter_value_buffer.clear();
into_del_add_obkv(
KvReaderU16::new(buffer),
false,
true,
&mut document_sorter_value_buffer,
)?;
original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
// Once we have the document. We're going to flatten it
// and insert it in the flattened sorter.
@ -839,18 +860,27 @@ impl<'a, 'i> Transform<'a, 'i> {
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
writer.insert(fid, &value)?;
}
document_sorter_buffer.clear();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut document_sorter_buffer)?;
flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
document_sorter_value_buffer.clear();
into_del_add_obkv(
KvReaderU16::new(&buffer),
false,
true,
&mut document_sorter_value_buffer,
)?;
flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
}
// Once we have written all the documents, we extract
// the file and reset the seek to be able to read it again.
let mut original_documents = original_writer.into_inner()?;
original_documents.rewind()?;
let grenad_params = GrenadParameters {
chunk_compression_type: self.indexer_settings.chunk_compression_type,
chunk_compression_level: self.indexer_settings.chunk_compression_level,
max_memory: self.indexer_settings.max_memory,
max_nb_chunks: self.indexer_settings.max_nb_chunks, // default value, may be chosen.
};
let mut flattened_documents = flattened_writer.into_inner()?;
flattened_documents.rewind()?;
// Once we have written all the documents, we merge everything into a Reader.
let original_documents = sorter_into_reader(original_sorter, grenad_params)?;
let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?;
let output = TransformOutput {
primary_key,
@ -862,10 +892,8 @@ impl<'a, 'i> Transform<'a, 'i> {
// FIXME: remove this now unused field
replaced_documents_ids: RoaringBitmap::default(),
documents_count,
original_documents: original_documents.into_inner().map_err(|err| err.into_error())?,
flattened_documents: flattened_documents
.into_inner()
.map_err(|err| err.into_error())?,
original_documents: original_documents.into_inner().into_inner(),
flattened_documents: flattened_documents.into_inner().into_inner(),
};
let new_facets = output.compute_real_facets(wtxn, self.index)?;