mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-23 21:20:24 +01:00
Merge pull request #4179 from meilisearch/diff-indexing-fix-nested-primary-key
Diff indexing fix nested primary key
This commit is contained in:
commit
1c39459cf4
@ -74,10 +74,6 @@ impl ExternalDocumentsIds {
|
|||||||
for DocumentOperation { external_id, internal_id, kind } in operations {
|
for DocumentOperation { external_id, internal_id, kind } in operations {
|
||||||
match kind {
|
match kind {
|
||||||
DocumentOperationKind::Create => {
|
DocumentOperationKind::Create => {
|
||||||
// TODO should we get before insert to be able to detect bugs?
|
|
||||||
// if matches!(kind, DocumentOperationKind::Create) {
|
|
||||||
// panic!("Attempting to create an already-existing document");
|
|
||||||
// }
|
|
||||||
self.0.put(wtxn, &external_id, &BEU32::new(internal_id))?;
|
self.0.put(wtxn, &external_id, &BEU32::new(internal_id))?;
|
||||||
}
|
}
|
||||||
DocumentOperationKind::Delete => {
|
DocumentOperationKind::Delete => {
|
||||||
@ -90,6 +86,11 @@ impl ExternalDocumentsIds {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns an iterator over all the external ids.
|
||||||
|
pub fn iter<'t>(&self, rtxn: &'t RoTxn) -> heed::Result<RoIter<'t, Str, OwnedType<BEU32>>> {
|
||||||
|
self.0.iter(rtxn)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An iterator over mappings between requested internal ids and external ids.
|
/// An iterator over mappings between requested internal ids and external ids.
|
||||||
|
@ -7,7 +7,8 @@ use serde_json::{from_slice, Value};
|
|||||||
|
|
||||||
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
|
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
|
||||||
use crate::error::UserError;
|
use crate::error::UserError;
|
||||||
use crate::{FieldId, InternalError, Result, VectorOrArrayOfVectors};
|
use crate::update::index_documents::helpers::try_split_at;
|
||||||
|
use crate::{DocumentId, FieldId, InternalError, Result, VectorOrArrayOfVectors};
|
||||||
|
|
||||||
/// Extracts the embedding vector contained in each document under the `_vectors` field.
|
/// Extracts the embedding vector contained in each document under the `_vectors` field.
|
||||||
///
|
///
|
||||||
@ -16,7 +17,6 @@ use crate::{FieldId, InternalError, Result, VectorOrArrayOfVectors};
|
|||||||
pub fn extract_vector_points<R: io::Read + io::Seek>(
|
pub fn extract_vector_points<R: io::Read + io::Seek>(
|
||||||
obkv_documents: grenad::Reader<R>,
|
obkv_documents: grenad::Reader<R>,
|
||||||
indexer: GrenadParameters,
|
indexer: GrenadParameters,
|
||||||
primary_key_id: FieldId,
|
|
||||||
vectors_fid: FieldId,
|
vectors_fid: FieldId,
|
||||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||||
puffin::profile_function!();
|
puffin::profile_function!();
|
||||||
@ -28,15 +28,17 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||||||
);
|
);
|
||||||
|
|
||||||
let mut cursor = obkv_documents.into_cursor()?;
|
let mut cursor = obkv_documents.into_cursor()?;
|
||||||
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
while let Some((key, value)) = cursor.move_on_next()? {
|
||||||
|
// this must always be serialized as (docid, external_docid);
|
||||||
|
let (docid_bytes, external_id_bytes) =
|
||||||
|
try_split_at(key, std::mem::size_of::<DocumentId>()).unwrap();
|
||||||
|
debug_assert!(std::str::from_utf8(external_id_bytes).is_ok());
|
||||||
|
|
||||||
let obkv = obkv::KvReader::new(value);
|
let obkv = obkv::KvReader::new(value);
|
||||||
|
|
||||||
// since we only needs the primary key when we throw an error we create this getter to
|
// since we only needs the primary key when we throw an error we create this getter to
|
||||||
// lazily get it when needed
|
// lazily get it when needed
|
||||||
let document_id = || -> Value {
|
let document_id = || -> Value { std::str::from_utf8(external_id_bytes).unwrap().into() };
|
||||||
let document_id = obkv.get(primary_key_id).unwrap();
|
|
||||||
from_slice(document_id).unwrap()
|
|
||||||
};
|
|
||||||
|
|
||||||
// first we retrieve the _vectors field
|
// first we retrieve the _vectors field
|
||||||
if let Some(vectors) = obkv.get(vectors_fid) {
|
if let Some(vectors) = obkv.get(vectors_fid) {
|
||||||
|
@ -63,7 +63,6 @@ pub(crate) fn data_from_obkv_documents(
|
|||||||
indexer,
|
indexer,
|
||||||
lmdb_writer_sx.clone(),
|
lmdb_writer_sx.clone(),
|
||||||
vectors_field_id,
|
vectors_field_id,
|
||||||
primary_key_id,
|
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect::<Result<()>>()?;
|
.collect::<Result<()>>()?;
|
||||||
@ -274,7 +273,6 @@ fn send_original_documents_data(
|
|||||||
indexer: GrenadParameters,
|
indexer: GrenadParameters,
|
||||||
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
||||||
vectors_field_id: Option<FieldId>,
|
vectors_field_id: Option<FieldId>,
|
||||||
primary_key_id: FieldId,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let original_documents_chunk =
|
let original_documents_chunk =
|
||||||
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
|
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
|
||||||
@ -283,12 +281,7 @@ fn send_original_documents_data(
|
|||||||
let documents_chunk_cloned = original_documents_chunk.clone();
|
let documents_chunk_cloned = original_documents_chunk.clone();
|
||||||
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
||||||
rayon::spawn(move || {
|
rayon::spawn(move || {
|
||||||
let result = extract_vector_points(
|
let result = extract_vector_points(documents_chunk_cloned, indexer, vectors_field_id);
|
||||||
documents_chunk_cloned,
|
|
||||||
indexer,
|
|
||||||
primary_key_id,
|
|
||||||
vectors_field_id,
|
|
||||||
);
|
|
||||||
let _ = match result {
|
let _ = match result {
|
||||||
Ok(vector_points) => {
|
Ok(vector_points) => {
|
||||||
lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints(vector_points)))
|
lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints(vector_points)))
|
||||||
|
@ -1387,6 +1387,8 @@ mod tests {
|
|||||||
index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap();
|
index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap();
|
||||||
|
|
||||||
let rtxn = index.read_txn().unwrap();
|
let rtxn = index.read_txn().unwrap();
|
||||||
|
let all_documents_count = index.all_documents(&rtxn).unwrap().count();
|
||||||
|
assert_eq!(all_documents_count, 1);
|
||||||
let external_documents_ids = index.external_documents_ids();
|
let external_documents_ids = index.external_documents_ids();
|
||||||
assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some());
|
assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some());
|
||||||
}
|
}
|
||||||
|
@ -14,14 +14,15 @@ use serde_json::Value;
|
|||||||
use smartstring::SmartString;
|
use smartstring::SmartString;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, create_writer, obkvs_keep_last_addition_merge_deletions,
|
create_sorter, create_writer, keep_first, obkvs_keep_last_addition_merge_deletions,
|
||||||
obkvs_merge_additions_and_deletions, MergeFn,
|
obkvs_merge_additions_and_deletions, sorter_into_reader, MergeFn,
|
||||||
};
|
};
|
||||||
use super::{IndexDocumentsMethod, IndexerConfig};
|
use super::{IndexDocumentsMethod, IndexerConfig};
|
||||||
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
|
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
|
||||||
use crate::error::{Error, InternalError, UserError};
|
use crate::error::{Error, InternalError, UserError};
|
||||||
use crate::index::{db_name, main_key};
|
use crate::index::{db_name, main_key};
|
||||||
use crate::update::del_add::{into_del_add_obkv, DelAdd, KvReaderDelAdd};
|
use crate::update::del_add::{into_del_add_obkv, DelAdd, KvReaderDelAdd};
|
||||||
|
use crate::update::index_documents::GrenadParameters;
|
||||||
use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep};
|
use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep};
|
||||||
use crate::{
|
use crate::{
|
||||||
FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32,
|
FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32,
|
||||||
@ -174,7 +175,8 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
|
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
|
||||||
|
|
||||||
let mut obkv_buffer = Vec::new();
|
let mut obkv_buffer = Vec::new();
|
||||||
let mut document_sorter_buffer = Vec::new();
|
let mut document_sorter_value_buffer = Vec::new();
|
||||||
|
let mut document_sorter_key_buffer = Vec::new();
|
||||||
let mut documents_count = 0;
|
let mut documents_count = 0;
|
||||||
let mut docid_buffer: Vec<u8> = Vec::new();
|
let mut docid_buffer: Vec<u8> = Vec::new();
|
||||||
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
|
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
|
||||||
@ -268,57 +270,66 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
// we associate the base document with the new key, everything will get merged later.
|
// we associate the base document with the new key, everything will get merged later.
|
||||||
let keep_original_version =
|
let keep_original_version =
|
||||||
self.index_documents_method == IndexDocumentsMethod::UpdateDocuments;
|
self.index_documents_method == IndexDocumentsMethod::UpdateDocuments;
|
||||||
document_sorter_buffer.clear();
|
document_sorter_key_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
|
||||||
|
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
|
||||||
|
document_sorter_value_buffer.clear();
|
||||||
|
document_sorter_value_buffer.push(Operation::Addition as u8);
|
||||||
into_del_add_obkv(
|
into_del_add_obkv(
|
||||||
KvReaderU16::new(base_obkv),
|
KvReaderU16::new(base_obkv),
|
||||||
true,
|
true,
|
||||||
keep_original_version,
|
keep_original_version,
|
||||||
&mut document_sorter_buffer,
|
&mut document_sorter_value_buffer,
|
||||||
)?;
|
)?;
|
||||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.original_sorter
|
||||||
|
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||||
let base_obkv = KvReader::new(base_obkv);
|
let base_obkv = KvReader::new(base_obkv);
|
||||||
if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? {
|
if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? {
|
||||||
// we recreate our buffer with the flattened documents
|
// we recreate our buffer with the flattened documents
|
||||||
document_sorter_buffer.clear();
|
document_sorter_value_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_value_buffer.push(Operation::Addition as u8);
|
||||||
into_del_add_obkv(
|
into_del_add_obkv(
|
||||||
KvReaderU16::new(&flattened_obkv),
|
KvReaderU16::new(&flattened_obkv),
|
||||||
true,
|
true,
|
||||||
keep_original_version,
|
keep_original_version,
|
||||||
&mut document_sorter_buffer,
|
&mut document_sorter_value_buffer,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.flattened_sorter
|
||||||
|
.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !skip_insertion {
|
if !skip_insertion {
|
||||||
self.new_documents_ids.insert(docid);
|
self.new_documents_ids.insert(docid);
|
||||||
|
|
||||||
document_sorter_buffer.clear();
|
document_sorter_key_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
|
||||||
|
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
|
||||||
|
document_sorter_value_buffer.clear();
|
||||||
|
document_sorter_value_buffer.push(Operation::Addition as u8);
|
||||||
into_del_add_obkv(
|
into_del_add_obkv(
|
||||||
KvReaderU16::new(&obkv_buffer),
|
KvReaderU16::new(&obkv_buffer),
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
&mut document_sorter_buffer,
|
&mut document_sorter_value_buffer,
|
||||||
)?;
|
)?;
|
||||||
// We use the extracted/generated user id as the key for this document.
|
// We use the extracted/generated user id as the key for this document.
|
||||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.original_sorter
|
||||||
|
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||||
|
|
||||||
let flattened_obkv = KvReader::new(&obkv_buffer);
|
let flattened_obkv = KvReader::new(&obkv_buffer);
|
||||||
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
|
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
|
||||||
document_sorter_buffer.clear();
|
document_sorter_value_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_value_buffer.push(Operation::Addition as u8);
|
||||||
into_del_add_obkv(
|
into_del_add_obkv(
|
||||||
KvReaderU16::new(&obkv),
|
KvReaderU16::new(&obkv),
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
&mut document_sorter_buffer,
|
&mut document_sorter_value_buffer,
|
||||||
)?
|
)?
|
||||||
}
|
}
|
||||||
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
||||||
}
|
}
|
||||||
documents_count += 1;
|
documents_count += 1;
|
||||||
|
|
||||||
@ -372,37 +383,42 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let external_documents_ids = self.index.external_documents_ids();
|
let external_documents_ids = self.index.external_documents_ids();
|
||||||
|
|
||||||
let mut documents_deleted = 0;
|
let mut documents_deleted = 0;
|
||||||
let mut document_sorter_buffer = Vec::new();
|
let mut document_sorter_value_buffer = Vec::new();
|
||||||
|
let mut document_sorter_key_buffer = Vec::new();
|
||||||
for to_remove in to_remove {
|
for to_remove in to_remove {
|
||||||
if should_abort() {
|
if should_abort() {
|
||||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the document has been added in the current indexing process.
|
// Check if the document has been added in the current indexing process.
|
||||||
let deleted_from_current = match self
|
let deleted_from_current =
|
||||||
.new_external_documents_ids_builder
|
match self.new_external_documents_ids_builder.entry((*to_remove).into()) {
|
||||||
.entry((*to_remove).into())
|
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
|
||||||
{
|
HEntry::Occupied(entry) => {
|
||||||
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
|
let docid = *entry.get() as u32;
|
||||||
HEntry::Occupied(entry) => {
|
// Key is the concatenation of the internal docid and the external one.
|
||||||
let doc_id = *entry.get() as u32;
|
document_sorter_key_buffer.clear();
|
||||||
document_sorter_buffer.clear();
|
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
|
||||||
document_sorter_buffer.push(Operation::Deletion as u8);
|
document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes());
|
||||||
obkv::KvWriterU16::new(&mut document_sorter_buffer).finish().unwrap();
|
document_sorter_value_buffer.clear();
|
||||||
self.original_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?;
|
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
||||||
self.flattened_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?;
|
obkv::KvWriterU16::new(&mut document_sorter_value_buffer).finish().unwrap();
|
||||||
|
self.original_sorter
|
||||||
|
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||||
|
self.flattened_sorter
|
||||||
|
.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
||||||
|
|
||||||
// we must NOT update the list of replaced_documents_ids
|
// we must NOT update the list of replaced_documents_ids
|
||||||
// Either:
|
// Either:
|
||||||
// 1. It's already in it and there is nothing to do
|
// 1. It's already in it and there is nothing to do
|
||||||
// 2. It wasn't in it because the document was created by a previous batch and since
|
// 2. It wasn't in it because the document was created by a previous batch and since
|
||||||
// we're removing it there is nothing to do.
|
// we're removing it there is nothing to do.
|
||||||
self.new_documents_ids.remove(doc_id);
|
self.new_documents_ids.remove(docid);
|
||||||
entry.remove_entry();
|
entry.remove_entry();
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
HEntry::Vacant(_) => false,
|
HEntry::Vacant(_) => false,
|
||||||
};
|
};
|
||||||
|
|
||||||
// If the document was already in the db we mark it as a `to_delete` document.
|
// If the document was already in the db we mark it as a `to_delete` document.
|
||||||
// Then we push the document in sorters in deletion mode.
|
// Then we push the document in sorters in deletion mode.
|
||||||
@ -422,31 +438,37 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
key: None,
|
key: None,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
// Key is the concatenation of the internal docid and the external one.
|
||||||
|
document_sorter_key_buffer.clear();
|
||||||
|
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
|
||||||
|
document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes());
|
||||||
// push it as to delete in the original_sorter
|
// push it as to delete in the original_sorter
|
||||||
document_sorter_buffer.clear();
|
document_sorter_value_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Deletion as u8);
|
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
||||||
into_del_add_obkv(
|
into_del_add_obkv(
|
||||||
KvReaderU16::new(base_obkv),
|
KvReaderU16::new(base_obkv),
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
&mut document_sorter_buffer,
|
&mut document_sorter_value_buffer,
|
||||||
)?;
|
)?;
|
||||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.original_sorter
|
||||||
|
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||||
|
|
||||||
// flatten it and push it as to delete in the flattened_sorter
|
// flatten it and push it as to delete in the flattened_sorter
|
||||||
let flattened_obkv = KvReader::new(base_obkv);
|
let flattened_obkv = KvReader::new(base_obkv);
|
||||||
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
|
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
|
||||||
// we recreate our buffer with the flattened documents
|
// we recreate our buffer with the flattened documents
|
||||||
document_sorter_buffer.clear();
|
document_sorter_value_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Deletion as u8);
|
document_sorter_value_buffer.push(Operation::Deletion as u8);
|
||||||
into_del_add_obkv(
|
into_del_add_obkv(
|
||||||
KvReaderU16::new(&obkv),
|
KvReaderU16::new(&obkv),
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
&mut document_sorter_buffer,
|
&mut document_sorter_value_buffer,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.flattened_sorter
|
||||||
|
.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
@ -754,24 +776,35 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let documents_ids = self.index.documents_ids(wtxn)?;
|
let documents_ids = self.index.documents_ids(wtxn)?;
|
||||||
let documents_count = documents_ids.len() as usize;
|
let documents_count = documents_ids.len() as usize;
|
||||||
|
|
||||||
// We create a final writer to write the new documents in order from the sorter.
|
// We initialize the sorter with the user indexing settings.
|
||||||
let mut original_writer = create_writer(
|
let mut original_sorter = create_sorter(
|
||||||
|
grenad::SortAlgorithm::Stable,
|
||||||
|
keep_first,
|
||||||
self.indexer_settings.chunk_compression_type,
|
self.indexer_settings.chunk_compression_type,
|
||||||
self.indexer_settings.chunk_compression_level,
|
self.indexer_settings.chunk_compression_level,
|
||||||
tempfile::tempfile()?,
|
self.indexer_settings.max_nb_chunks,
|
||||||
|
self.indexer_settings.max_memory.map(|mem| mem / 2),
|
||||||
);
|
);
|
||||||
|
|
||||||
// We create a final writer to write the new documents in order from the sorter.
|
// We initialize the sorter with the user indexing settings.
|
||||||
let mut flattened_writer = create_writer(
|
let mut flattened_sorter = create_sorter(
|
||||||
|
grenad::SortAlgorithm::Stable,
|
||||||
|
keep_first,
|
||||||
self.indexer_settings.chunk_compression_type,
|
self.indexer_settings.chunk_compression_type,
|
||||||
self.indexer_settings.chunk_compression_level,
|
self.indexer_settings.chunk_compression_level,
|
||||||
tempfile::tempfile()?,
|
self.indexer_settings.max_nb_chunks,
|
||||||
|
self.indexer_settings.max_memory.map(|mem| mem / 2),
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut obkv_buffer = Vec::new();
|
let mut obkv_buffer = Vec::new();
|
||||||
let mut document_sorter_buffer = Vec::new();
|
let mut document_sorter_key_buffer = Vec::new();
|
||||||
for result in self.index.all_documents(wtxn)? {
|
let mut document_sorter_value_buffer = Vec::new();
|
||||||
let (docid, obkv) = result?;
|
for result in self.index.external_documents_ids().iter(wtxn)? {
|
||||||
|
let (external_id, docid) = result?;
|
||||||
|
let obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
|
||||||
|
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
||||||
|
)?;
|
||||||
|
let docid = docid.get();
|
||||||
|
|
||||||
obkv_buffer.clear();
|
obkv_buffer.clear();
|
||||||
let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer);
|
let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer);
|
||||||
@ -784,9 +817,18 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let buffer = obkv_writer.into_inner()?;
|
let buffer = obkv_writer.into_inner()?;
|
||||||
document_sorter_buffer.clear();
|
|
||||||
into_del_add_obkv(KvReaderU16::new(buffer), false, true, &mut document_sorter_buffer)?;
|
document_sorter_key_buffer.clear();
|
||||||
original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
|
||||||
|
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
|
||||||
|
document_sorter_value_buffer.clear();
|
||||||
|
into_del_add_obkv(
|
||||||
|
KvReaderU16::new(buffer),
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
&mut document_sorter_value_buffer,
|
||||||
|
)?;
|
||||||
|
original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
|
||||||
|
|
||||||
// Once we have the document. We're going to flatten it
|
// Once we have the document. We're going to flatten it
|
||||||
// and insert it in the flattened sorter.
|
// and insert it in the flattened sorter.
|
||||||
@ -821,18 +863,27 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
|
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
|
||||||
writer.insert(fid, &value)?;
|
writer.insert(fid, &value)?;
|
||||||
}
|
}
|
||||||
document_sorter_buffer.clear();
|
document_sorter_value_buffer.clear();
|
||||||
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut document_sorter_buffer)?;
|
into_del_add_obkv(
|
||||||
flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
KvReaderU16::new(&buffer),
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
&mut document_sorter_value_buffer,
|
||||||
|
)?;
|
||||||
|
flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once we have written all the documents, we extract
|
let grenad_params = GrenadParameters {
|
||||||
// the file and reset the seek to be able to read it again.
|
chunk_compression_type: self.indexer_settings.chunk_compression_type,
|
||||||
let mut original_documents = original_writer.into_inner()?;
|
chunk_compression_level: self.indexer_settings.chunk_compression_level,
|
||||||
original_documents.rewind()?;
|
max_memory: self.indexer_settings.max_memory,
|
||||||
|
max_nb_chunks: self.indexer_settings.max_nb_chunks, // default value, may be chosen.
|
||||||
|
};
|
||||||
|
|
||||||
let mut flattened_documents = flattened_writer.into_inner()?;
|
// Once we have written all the documents, we merge everything into a Reader.
|
||||||
flattened_documents.rewind()?;
|
let original_documents = sorter_into_reader(original_sorter, grenad_params)?;
|
||||||
|
|
||||||
|
let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?;
|
||||||
|
|
||||||
let output = TransformOutput {
|
let output = TransformOutput {
|
||||||
primary_key,
|
primary_key,
|
||||||
@ -844,10 +895,8 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
// FIXME: remove this now unused field
|
// FIXME: remove this now unused field
|
||||||
replaced_documents_ids: RoaringBitmap::default(),
|
replaced_documents_ids: RoaringBitmap::default(),
|
||||||
documents_count,
|
documents_count,
|
||||||
original_documents: original_documents.into_inner().map_err(|err| err.into_error())?,
|
original_documents: original_documents.into_inner().into_inner(),
|
||||||
flattened_documents: flattened_documents
|
flattened_documents: flattened_documents.into_inner().into_inner(),
|
||||||
.into_inner()
|
|
||||||
.map_err(|err| err.into_error())?,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_facets = output.compute_real_facets(wtxn, self.index)?;
|
let new_facets = output.compute_real_facets(wtxn, self.index)?;
|
||||||
|
@ -17,6 +17,7 @@ use crate::distance::NDotProductPoint;
|
|||||||
use crate::error::UserError;
|
use crate::error::UserError;
|
||||||
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
|
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
|
||||||
use crate::facet::FacetType;
|
use crate::facet::FacetType;
|
||||||
|
use crate::index::db_name::DOCUMENTS;
|
||||||
use crate::index::Hnsw;
|
use crate::index::Hnsw;
|
||||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
||||||
use crate::update::facet::FacetsUpdate;
|
use crate::update::facet::FacetsUpdate;
|
||||||
@ -24,7 +25,7 @@ use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_arr
|
|||||||
use crate::update::index_documents::validate_document_id_value;
|
use crate::update::index_documents::validate_document_id_value;
|
||||||
use crate::{
|
use crate::{
|
||||||
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
|
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
|
||||||
Result, BEU32,
|
Result, SerializationError, BEU32,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(crate) enum TypedChunk {
|
pub(crate) enum TypedChunk {
|
||||||
@ -124,13 +125,15 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
let mut operations: Vec<DocumentOperation> = Default::default();
|
let mut operations: Vec<DocumentOperation> = Default::default();
|
||||||
|
|
||||||
let mut docids = index.documents_ids(wtxn)?;
|
let mut docids = index.documents_ids(wtxn)?;
|
||||||
let primary_key = index.primary_key(wtxn)?.unwrap();
|
|
||||||
let primary_key = index.fields_ids_map(wtxn)?.id(primary_key).unwrap();
|
|
||||||
let mut cursor = obkv_documents_iter.into_cursor()?;
|
let mut cursor = obkv_documents_iter.into_cursor()?;
|
||||||
while let Some((docid, reader)) = cursor.move_on_next()? {
|
while let Some((key, reader)) = cursor.move_on_next()? {
|
||||||
let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
|
let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
|
||||||
let reader: KvReader<FieldId> = KvReader::new(reader);
|
let reader: KvReader<FieldId> = KvReader::new(reader);
|
||||||
let docid = docid.try_into().map(DocumentId::from_be_bytes).unwrap();
|
|
||||||
|
let (document_id_bytes, external_id_bytes) = try_split_array_at(key)
|
||||||
|
.ok_or(SerializationError::Decoding { db_name: Some(DOCUMENTS) })?;
|
||||||
|
let docid = DocumentId::from_be_bytes(document_id_bytes);
|
||||||
|
let external_id = std::str::from_utf8(external_id_bytes)?;
|
||||||
|
|
||||||
for (field_id, value) in reader.iter() {
|
for (field_id, value) in reader.iter() {
|
||||||
let del_add_reader = KvReaderDelAdd::new(value);
|
let del_add_reader = KvReaderDelAdd::new(value);
|
||||||
@ -140,45 +143,10 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
) {
|
) {
|
||||||
(None, None) => {}
|
(None, None) => {}
|
||||||
(None, Some(value)) => {
|
(None, Some(value)) => {
|
||||||
// if primary key, new document
|
|
||||||
if field_id == primary_key {
|
|
||||||
// FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk
|
|
||||||
// rather than re-extract it here
|
|
||||||
// FIXME: unwraps
|
|
||||||
let document_id = serde_json::from_slice(value)
|
|
||||||
.map_err(InternalError::SerdeJson)
|
|
||||||
.unwrap();
|
|
||||||
let external_id =
|
|
||||||
validate_document_id_value(document_id).unwrap().unwrap();
|
|
||||||
operations.push(DocumentOperation {
|
|
||||||
external_id,
|
|
||||||
internal_id: docid,
|
|
||||||
kind: DocumentOperationKind::Create,
|
|
||||||
});
|
|
||||||
docids.insert(docid);
|
|
||||||
}
|
|
||||||
// anyway, write
|
// anyway, write
|
||||||
writer.insert(field_id, value)?;
|
writer.insert(field_id, value)?;
|
||||||
}
|
}
|
||||||
(Some(value), None) => {
|
(Some(_), None) => {}
|
||||||
// if primary key, deleted document
|
|
||||||
if field_id == primary_key {
|
|
||||||
// FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk
|
|
||||||
// rather than re-extract it here
|
|
||||||
// FIXME: unwraps
|
|
||||||
let document_id = serde_json::from_slice(value)
|
|
||||||
.map_err(InternalError::SerdeJson)
|
|
||||||
.unwrap();
|
|
||||||
let external_id =
|
|
||||||
validate_document_id_value(document_id).unwrap().unwrap();
|
|
||||||
operations.push(DocumentOperation {
|
|
||||||
external_id,
|
|
||||||
internal_id: docid,
|
|
||||||
kind: DocumentOperationKind::Delete,
|
|
||||||
});
|
|
||||||
docids.remove(docid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(Some(_), Some(value)) => {
|
(Some(_), Some(value)) => {
|
||||||
// updated field, write
|
// updated field, write
|
||||||
writer.insert(field_id, value)?;
|
writer.insert(field_id, value)?;
|
||||||
@ -190,8 +158,20 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
|
|
||||||
if !writer.is_empty() {
|
if !writer.is_empty() {
|
||||||
db.put(wtxn, &BEU32::new(docid), &writer.into_inner().unwrap())?;
|
db.put(wtxn, &BEU32::new(docid), &writer.into_inner().unwrap())?;
|
||||||
|
operations.push(DocumentOperation {
|
||||||
|
external_id: external_id.to_string(),
|
||||||
|
internal_id: docid,
|
||||||
|
kind: DocumentOperationKind::Create,
|
||||||
|
});
|
||||||
|
docids.insert(docid);
|
||||||
} else {
|
} else {
|
||||||
db.delete(wtxn, &BEU32::new(docid))?;
|
db.delete(wtxn, &BEU32::new(docid))?;
|
||||||
|
operations.push(DocumentOperation {
|
||||||
|
external_id: external_id.to_string(),
|
||||||
|
internal_id: docid,
|
||||||
|
kind: DocumentOperationKind::Delete,
|
||||||
|
});
|
||||||
|
docids.remove(docid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let external_documents_docids = index.external_documents_ids();
|
let external_documents_docids = index.external_documents_ids();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user