2022-10-12 13:24:56 +02:00
|
|
|
use std::collections::HashMap;
|
2021-09-09 12:20:08 +02:00
|
|
|
use std::convert::TryInto;
|
2021-08-16 13:36:30 +02:00
|
|
|
use std::fs::File;
|
2023-09-28 16:26:01 +02:00
|
|
|
use std::io::{self, BufReader};
|
2021-08-16 13:36:30 +02:00
|
|
|
|
2023-06-08 12:19:06 +02:00
|
|
|
use bytemuck::allocation::pod_collect_to_vec;
|
2022-10-12 13:24:56 +02:00
|
|
|
use charabia::{Language, Script};
|
2022-03-24 15:22:57 +01:00
|
|
|
use grenad::MergerBuilder;
|
2021-08-16 13:36:30 +02:00
|
|
|
use heed::types::ByteSlice;
|
2023-06-07 10:02:21 +02:00
|
|
|
use heed::RwTxn;
|
2023-10-24 11:03:35 +02:00
|
|
|
use obkv::{KvReader, KvWriter};
|
2021-08-16 13:36:30 +02:00
|
|
|
use roaring::RoaringBitmap;
|
|
|
|
|
2023-10-23 15:19:33 +02:00
|
|
|
use super::helpers::{self, merge_ignore_values, valid_lmdb_key, CursorClonableMmap};
|
2022-03-24 15:22:57 +01:00
|
|
|
use super::{ClonableMmap, MergeFn};
|
2023-07-25 12:36:01 +02:00
|
|
|
use crate::distance::NDotProductPoint;
|
2023-06-14 16:34:09 +02:00
|
|
|
use crate::error::UserError;
|
2023-10-24 17:04:48 +02:00
|
|
|
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
|
2022-09-05 12:52:05 +02:00
|
|
|
use crate::facet::FacetType;
|
2023-07-25 12:36:01 +02:00
|
|
|
use crate::index::Hnsw;
|
2023-10-19 10:47:00 +02:00
|
|
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
2022-09-05 12:52:05 +02:00
|
|
|
use crate::update::facet::FacetsUpdate;
|
2023-06-20 11:17:20 +02:00
|
|
|
use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at};
|
2023-10-24 17:04:48 +02:00
|
|
|
use crate::update::index_documents::validate_document_id_value;
|
2023-10-24 11:03:35 +02:00
|
|
|
use crate::{
|
2023-10-24 17:04:48 +02:00
|
|
|
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
|
|
|
|
Result, BEU32,
|
2023-10-24 11:03:35 +02:00
|
|
|
};
|
2021-08-16 13:36:30 +02:00
|
|
|
|
|
|
|
pub(crate) enum TypedChunk {
|
|
|
|
FieldIdDocidFacetStrings(grenad::Reader<CursorClonableMmap>),
|
|
|
|
FieldIdDocidFacetNumbers(grenad::Reader<CursorClonableMmap>),
|
|
|
|
Documents(grenad::Reader<CursorClonableMmap>),
|
2023-10-19 10:38:58 +02:00
|
|
|
FieldIdWordCountDocids(grenad::Reader<BufReader<File>>),
|
2022-03-24 15:22:57 +01:00
|
|
|
WordDocids {
|
2023-09-28 16:26:01 +02:00
|
|
|
word_docids_reader: grenad::Reader<BufReader<File>>,
|
|
|
|
exact_word_docids_reader: grenad::Reader<BufReader<File>>,
|
2023-09-18 09:59:38 +02:00
|
|
|
word_fid_docids_reader: grenad::Reader<BufReader<File>>,
|
2022-03-24 15:22:57 +01:00
|
|
|
},
|
2023-09-28 16:26:01 +02:00
|
|
|
WordPositionDocids(grenad::Reader<BufReader<File>>),
|
|
|
|
WordPairProximityDocids(grenad::Reader<BufReader<File>>),
|
|
|
|
FieldIdFacetStringDocids(grenad::Reader<BufReader<File>>),
|
|
|
|
FieldIdFacetNumberDocids(grenad::Reader<BufReader<File>>),
|
|
|
|
FieldIdFacetExistsDocids(grenad::Reader<BufReader<File>>),
|
|
|
|
FieldIdFacetIsNullDocids(grenad::Reader<BufReader<File>>),
|
|
|
|
FieldIdFacetIsEmptyDocids(grenad::Reader<BufReader<File>>),
|
|
|
|
GeoPoints(grenad::Reader<BufReader<File>>),
|
|
|
|
VectorPoints(grenad::Reader<BufReader<File>>),
|
2023-10-19 10:22:39 +02:00
|
|
|
ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>),
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
|
|
|
|
2023-07-10 18:41:54 +02:00
|
|
|
impl TypedChunk {
|
|
|
|
pub fn to_debug_string(&self) -> String {
|
|
|
|
match self {
|
|
|
|
TypedChunk::FieldIdDocidFacetStrings(grenad) => {
|
|
|
|
format!("FieldIdDocidFacetStrings {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::FieldIdDocidFacetNumbers(grenad) => {
|
|
|
|
format!("FieldIdDocidFacetNumbers {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::Documents(grenad) => {
|
|
|
|
format!("Documents {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
2023-10-19 10:38:58 +02:00
|
|
|
TypedChunk::FieldIdWordCountDocids(grenad) => {
|
2023-07-10 18:41:54 +02:00
|
|
|
format!("FieldIdWordcountDocids {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
2023-09-18 09:59:38 +02:00
|
|
|
TypedChunk::WordDocids {
|
|
|
|
word_docids_reader,
|
|
|
|
exact_word_docids_reader,
|
|
|
|
word_fid_docids_reader,
|
|
|
|
} => format!(
|
|
|
|
"WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {}, word_fid_docids_reader: {} }}",
|
2023-07-10 18:41:54 +02:00
|
|
|
word_docids_reader.len(),
|
2023-09-18 09:59:38 +02:00
|
|
|
exact_word_docids_reader.len(),
|
|
|
|
word_fid_docids_reader.len()
|
2023-07-10 18:41:54 +02:00
|
|
|
),
|
|
|
|
TypedChunk::WordPositionDocids(grenad) => {
|
|
|
|
format!("WordPositionDocids {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::WordPairProximityDocids(grenad) => {
|
|
|
|
format!("WordPairProximityDocids {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::FieldIdFacetStringDocids(grenad) => {
|
|
|
|
format!("FieldIdFacetStringDocids {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::FieldIdFacetNumberDocids(grenad) => {
|
|
|
|
format!("FieldIdFacetNumberDocids {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::FieldIdFacetExistsDocids(grenad) => {
|
|
|
|
format!("FieldIdFacetExistsDocids {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::FieldIdFacetIsNullDocids(grenad) => {
|
|
|
|
format!("FieldIdFacetIsNullDocids {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::FieldIdFacetIsEmptyDocids(grenad) => {
|
|
|
|
format!("FieldIdFacetIsEmptyDocids {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::GeoPoints(grenad) => {
|
|
|
|
format!("GeoPoints {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
|
|
|
TypedChunk::VectorPoints(grenad) => {
|
|
|
|
format!("VectorPoints {{ number_of_entries: {} }}", grenad.len())
|
|
|
|
}
|
2023-10-19 10:22:39 +02:00
|
|
|
TypedChunk::ScriptLanguageDocids(sl_map) => {
|
|
|
|
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", sl_map.len())
|
2023-07-10 18:41:54 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-16 13:36:30 +02:00
|
|
|
/// Write typed chunk in the corresponding LMDB database of the provided index.
|
|
|
|
/// Return new documents seen.
|
|
|
|
pub(crate) fn write_typed_chunk_into_index(
|
|
|
|
typed_chunk: TypedChunk,
|
|
|
|
index: &Index,
|
|
|
|
wtxn: &mut RwTxn,
|
|
|
|
index_is_empty: bool,
|
2021-08-17 10:56:06 +02:00
|
|
|
) -> Result<(RoaringBitmap, bool)> {
|
2023-07-10 18:41:54 +02:00
|
|
|
puffin::profile_function!(typed_chunk.to_debug_string());
|
|
|
|
|
2021-08-17 10:56:06 +02:00
|
|
|
let mut is_merged_database = false;
|
2021-08-16 13:36:30 +02:00
|
|
|
match typed_chunk {
|
2022-02-16 15:28:48 +01:00
|
|
|
TypedChunk::Documents(obkv_documents_iter) => {
|
2023-10-24 17:04:48 +02:00
|
|
|
let mut operations: Vec<DocumentOperation> = Default::default();
|
2023-10-24 14:26:49 +02:00
|
|
|
|
2023-10-24 17:04:48 +02:00
|
|
|
let mut docids = index.documents_ids(wtxn)?;
|
|
|
|
let primary_key = index.primary_key(wtxn)?.unwrap();
|
|
|
|
let primary_key = index.fields_ids_map(wtxn)?.id(primary_key).unwrap();
|
2022-02-16 15:28:48 +01:00
|
|
|
let mut cursor = obkv_documents_iter.into_cursor()?;
|
2023-10-24 11:03:35 +02:00
|
|
|
while let Some((docid, reader)) = cursor.move_on_next()? {
|
|
|
|
let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
|
|
|
|
let reader: KvReader<FieldId> = KvReader::new(reader);
|
2023-10-24 17:04:48 +02:00
|
|
|
let docid = docid.try_into().map(DocumentId::from_be_bytes).unwrap();
|
|
|
|
|
2023-10-24 11:03:35 +02:00
|
|
|
for (field_id, value) in reader.iter() {
|
2023-10-24 17:04:48 +02:00
|
|
|
let del_add_reader = KvReaderDelAdd::new(value);
|
|
|
|
match (
|
|
|
|
del_add_reader.get(DelAdd::Deletion),
|
|
|
|
del_add_reader.get(DelAdd::Addition),
|
|
|
|
) {
|
|
|
|
(None, None) => {}
|
|
|
|
(None, Some(value)) => {
|
|
|
|
// if primary key, new document
|
|
|
|
if field_id == primary_key {
|
|
|
|
// FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk
|
|
|
|
// rather than re-extract it here
|
|
|
|
// FIXME: unwraps
|
|
|
|
let document_id = serde_json::from_slice(value)
|
|
|
|
.map_err(InternalError::SerdeJson)
|
|
|
|
.unwrap();
|
|
|
|
let external_id =
|
|
|
|
validate_document_id_value(document_id).unwrap().unwrap();
|
|
|
|
operations.push(DocumentOperation {
|
|
|
|
external_id,
|
|
|
|
internal_id: docid,
|
|
|
|
kind: DocumentOperationKind::Create,
|
|
|
|
});
|
|
|
|
docids.insert(docid);
|
|
|
|
}
|
|
|
|
// anyway, write
|
|
|
|
writer.insert(field_id, value)?;
|
|
|
|
}
|
|
|
|
(Some(value), None) => {
|
|
|
|
// if primary key, deleted document
|
|
|
|
if field_id == primary_key {
|
|
|
|
// FIXME: we already extracted the external docid before. We should retrieve it in the typed chunk
|
|
|
|
// rather than re-extract it here
|
|
|
|
// FIXME: unwraps
|
|
|
|
let document_id = serde_json::from_slice(value)
|
|
|
|
.map_err(InternalError::SerdeJson)
|
|
|
|
.unwrap();
|
|
|
|
let external_id =
|
|
|
|
validate_document_id_value(document_id).unwrap().unwrap();
|
|
|
|
operations.push(DocumentOperation {
|
|
|
|
external_id,
|
|
|
|
internal_id: docid,
|
|
|
|
kind: DocumentOperationKind::Delete,
|
|
|
|
});
|
|
|
|
docids.remove(docid);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
(Some(_), Some(value)) => {
|
|
|
|
// updated field, write
|
|
|
|
writer.insert(field_id, value)?;
|
|
|
|
}
|
|
|
|
}
|
2023-10-24 11:03:35 +02:00
|
|
|
}
|
2023-10-24 14:26:49 +02:00
|
|
|
|
|
|
|
let db = index.documents.remap_data_type::<ByteSlice>();
|
|
|
|
|
2023-10-24 17:04:48 +02:00
|
|
|
if !writer.is_empty() {
|
2023-10-24 14:26:49 +02:00
|
|
|
db.put(wtxn, &BEU32::new(docid), &writer.into_inner().unwrap())?;
|
|
|
|
} else {
|
|
|
|
db.delete(wtxn, &BEU32::new(docid))?;
|
|
|
|
}
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
2023-10-24 17:04:48 +02:00
|
|
|
let mut external_documents_docids = index.external_documents_ids(wtxn)?.into_static();
|
|
|
|
external_documents_docids.apply(operations);
|
|
|
|
index.put_external_documents_ids(wtxn, &external_documents_docids)?;
|
2023-10-24 14:26:49 +02:00
|
|
|
|
|
|
|
index.put_documents_ids(wtxn, &docids)?;
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
2023-10-19 10:38:58 +02:00
|
|
|
TypedChunk::FieldIdWordCountDocids(fid_word_count_docids_iter) => {
|
2021-08-16 13:36:30 +02:00
|
|
|
append_entries_into_database(
|
|
|
|
fid_word_count_docids_iter,
|
|
|
|
&index.field_id_word_count_docids,
|
|
|
|
wtxn,
|
|
|
|
index_is_empty,
|
2023-10-19 10:47:00 +02:00
|
|
|
deladd_serialize_add_side,
|
2023-10-19 11:18:30 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2021-08-16 13:36:30 +02:00
|
|
|
)?;
|
2021-08-17 10:56:06 +02:00
|
|
|
is_merged_database = true;
|
|
|
|
}
|
2023-09-18 09:59:38 +02:00
|
|
|
TypedChunk::WordDocids {
|
|
|
|
word_docids_reader,
|
|
|
|
exact_word_docids_reader,
|
|
|
|
word_fid_docids_reader,
|
|
|
|
} => {
|
2022-03-24 15:22:57 +01:00
|
|
|
let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_reader) }?;
|
2021-08-16 13:36:30 +02:00
|
|
|
append_entries_into_database(
|
|
|
|
word_docids_iter.clone(),
|
|
|
|
&index.word_docids,
|
|
|
|
wtxn,
|
|
|
|
index_is_empty,
|
2023-10-19 10:47:00 +02:00
|
|
|
deladd_serialize_add_side,
|
2023-10-19 11:18:30 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2021-08-16 13:36:30 +02:00
|
|
|
)?;
|
|
|
|
|
2022-03-24 15:22:57 +01:00
|
|
|
let exact_word_docids_iter = unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?;
|
|
|
|
append_entries_into_database(
|
|
|
|
exact_word_docids_iter.clone(),
|
|
|
|
&index.exact_word_docids,
|
|
|
|
wtxn,
|
|
|
|
index_is_empty,
|
2023-10-19 10:47:00 +02:00
|
|
|
deladd_serialize_add_side,
|
2023-10-19 11:18:30 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2022-03-24 15:22:57 +01:00
|
|
|
)?;
|
|
|
|
|
2023-09-18 09:59:38 +02:00
|
|
|
let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?;
|
|
|
|
append_entries_into_database(
|
|
|
|
word_fid_docids_iter,
|
|
|
|
&index.word_fid_docids,
|
|
|
|
wtxn,
|
|
|
|
index_is_empty,
|
2023-10-19 10:47:00 +02:00
|
|
|
deladd_serialize_add_side,
|
2023-10-19 11:18:30 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2023-09-18 09:59:38 +02:00
|
|
|
)?;
|
|
|
|
|
2021-08-16 13:36:30 +02:00
|
|
|
// create fst from word docids
|
2022-03-24 15:22:57 +01:00
|
|
|
let fst = merge_word_docids_reader_into_fst(word_docids_iter, exact_word_docids_iter)?;
|
2021-08-16 13:36:30 +02:00
|
|
|
let db_fst = index.words_fst(wtxn)?;
|
|
|
|
|
|
|
|
// merge new fst with database fst
|
|
|
|
let union_stream = fst.op().add(db_fst.stream()).union();
|
|
|
|
let mut builder = fst::SetBuilder::memory();
|
|
|
|
builder.extend_stream(union_stream)?;
|
|
|
|
let fst = builder.into_set();
|
|
|
|
index.put_words_fst(wtxn, &fst)?;
|
2021-08-17 10:56:06 +02:00
|
|
|
is_merged_database = true;
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
2021-10-05 11:18:42 +02:00
|
|
|
TypedChunk::WordPositionDocids(word_position_docids_iter) => {
|
2021-08-16 13:36:30 +02:00
|
|
|
append_entries_into_database(
|
2021-10-05 11:18:42 +02:00
|
|
|
word_position_docids_iter,
|
|
|
|
&index.word_position_docids,
|
2021-08-16 13:36:30 +02:00
|
|
|
wtxn,
|
|
|
|
index_is_empty,
|
2023-10-19 10:47:00 +02:00
|
|
|
deladd_serialize_add_side,
|
2023-10-19 11:18:30 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2021-08-16 13:36:30 +02:00
|
|
|
)?;
|
2021-08-17 10:56:06 +02:00
|
|
|
is_merged_database = true;
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
2022-09-05 12:52:05 +02:00
|
|
|
TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids_iter) => {
|
|
|
|
let indexer = FacetsUpdate::new(index, FacetType::Number, facet_id_number_docids_iter);
|
|
|
|
indexer.execute(wtxn)?;
|
2021-08-17 10:56:06 +02:00
|
|
|
is_merged_database = true;
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
2022-09-05 12:52:05 +02:00
|
|
|
TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids_iter) => {
|
|
|
|
let indexer = FacetsUpdate::new(index, FacetType::String, facet_id_string_docids_iter);
|
|
|
|
indexer.execute(wtxn)?;
|
2022-09-01 08:17:27 +02:00
|
|
|
is_merged_database = true;
|
|
|
|
}
|
2022-07-19 09:57:28 +02:00
|
|
|
TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => {
|
2022-07-19 14:42:35 +02:00
|
|
|
append_entries_into_database(
|
|
|
|
facet_id_exists_docids,
|
2022-07-19 09:30:19 +02:00
|
|
|
&index.facet_id_exists_docids,
|
|
|
|
wtxn,
|
2022-07-19 14:42:35 +02:00
|
|
|
index_is_empty,
|
2023-10-19 10:47:00 +02:00
|
|
|
deladd_serialize_add_side,
|
2023-10-19 11:18:30 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2022-07-19 09:57:28 +02:00
|
|
|
)?;
|
2022-07-19 09:30:19 +02:00
|
|
|
is_merged_database = true;
|
|
|
|
}
|
2023-03-08 16:49:53 +01:00
|
|
|
TypedChunk::FieldIdFacetIsNullDocids(facet_id_is_null_docids) => {
|
|
|
|
append_entries_into_database(
|
|
|
|
facet_id_is_null_docids,
|
|
|
|
&index.facet_id_is_null_docids,
|
|
|
|
wtxn,
|
|
|
|
index_is_empty,
|
2023-10-19 10:47:00 +02:00
|
|
|
deladd_serialize_add_side,
|
2023-10-19 11:18:30 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2023-03-08 16:49:53 +01:00
|
|
|
)?;
|
|
|
|
is_merged_database = true;
|
|
|
|
}
|
2023-03-14 18:08:12 +01:00
|
|
|
TypedChunk::FieldIdFacetIsEmptyDocids(facet_id_is_empty_docids) => {
|
|
|
|
append_entries_into_database(
|
|
|
|
facet_id_is_empty_docids,
|
|
|
|
&index.facet_id_is_empty_docids,
|
|
|
|
wtxn,
|
|
|
|
index_is_empty,
|
2023-10-19 10:47:00 +02:00
|
|
|
deladd_serialize_add_side,
|
2023-10-19 11:18:30 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2023-03-14 18:08:12 +01:00
|
|
|
)?;
|
|
|
|
is_merged_database = true;
|
|
|
|
}
|
2021-08-16 13:36:30 +02:00
|
|
|
TypedChunk::WordPairProximityDocids(word_pair_proximity_docids_iter) => {
|
|
|
|
append_entries_into_database(
|
|
|
|
word_pair_proximity_docids_iter,
|
|
|
|
&index.word_pair_proximity_docids,
|
|
|
|
wtxn,
|
|
|
|
index_is_empty,
|
2023-10-19 10:47:00 +02:00
|
|
|
deladd_serialize_add_side,
|
2023-10-19 11:18:30 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2021-08-16 13:36:30 +02:00
|
|
|
)?;
|
2021-08-17 10:56:06 +02:00
|
|
|
is_merged_database = true;
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
2022-02-16 15:28:48 +01:00
|
|
|
TypedChunk::FieldIdDocidFacetNumbers(fid_docid_facet_number) => {
|
2021-08-16 13:36:30 +02:00
|
|
|
let index_fid_docid_facet_numbers =
|
|
|
|
index.field_id_docid_facet_f64s.remap_types::<ByteSlice, ByteSlice>();
|
2022-02-16 15:28:48 +01:00
|
|
|
let mut cursor = fid_docid_facet_number.into_cursor()?;
|
|
|
|
while let Some((key, value)) = cursor.move_on_next()? {
|
2023-10-26 18:22:03 +02:00
|
|
|
let reader = KvReaderDelAdd::new(value);
|
2021-08-16 13:36:30 +02:00
|
|
|
if valid_lmdb_key(key) {
|
2023-10-26 18:22:03 +02:00
|
|
|
match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) {
|
|
|
|
(None, None) => {}
|
|
|
|
(None, Some(new)) => index_fid_docid_facet_numbers.put(wtxn, key, new)?,
|
|
|
|
(Some(_), None) => {
|
|
|
|
index_fid_docid_facet_numbers.delete(wtxn, key)?;
|
|
|
|
}
|
|
|
|
(Some(_), Some(new)) => {
|
|
|
|
index_fid_docid_facet_numbers.put(wtxn, key, new)?
|
|
|
|
}
|
|
|
|
}
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-02-16 15:28:48 +01:00
|
|
|
TypedChunk::FieldIdDocidFacetStrings(fid_docid_facet_string) => {
|
2021-08-16 13:36:30 +02:00
|
|
|
let index_fid_docid_facet_strings =
|
|
|
|
index.field_id_docid_facet_strings.remap_types::<ByteSlice, ByteSlice>();
|
2022-02-16 15:28:48 +01:00
|
|
|
let mut cursor = fid_docid_facet_string.into_cursor()?;
|
|
|
|
while let Some((key, value)) = cursor.move_on_next()? {
|
2023-10-26 18:22:03 +02:00
|
|
|
let reader = KvReaderDelAdd::new(value);
|
2021-08-16 13:36:30 +02:00
|
|
|
if valid_lmdb_key(key) {
|
2023-10-26 18:22:03 +02:00
|
|
|
match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) {
|
|
|
|
(None, None) => {}
|
|
|
|
(None, Some(new)) => index_fid_docid_facet_strings.put(wtxn, key, new)?,
|
|
|
|
(Some(_), None) => {
|
|
|
|
index_fid_docid_facet_strings.delete(wtxn, key)?;
|
|
|
|
}
|
|
|
|
(Some(_), Some(new)) => {
|
|
|
|
index_fid_docid_facet_strings.put(wtxn, key, new)?
|
|
|
|
}
|
|
|
|
}
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-02-16 15:28:48 +01:00
|
|
|
TypedChunk::GeoPoints(geo_points) => {
|
2021-08-25 16:59:38 +02:00
|
|
|
let mut rtree = index.geo_rtree(wtxn)?.unwrap_or_default();
|
2021-09-09 12:20:08 +02:00
|
|
|
let mut geo_faceted_docids = index.geo_faceted_documents_ids(wtxn)?;
|
2021-08-26 17:49:50 +02:00
|
|
|
|
2022-02-16 15:28:48 +01:00
|
|
|
let mut cursor = geo_points.into_cursor()?;
|
|
|
|
while let Some((key, value)) = cursor.move_on_next()? {
|
2021-08-23 18:41:48 +02:00
|
|
|
// convert the key back to a u32 (4 bytes)
|
2021-09-09 12:20:08 +02:00
|
|
|
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
|
2021-08-23 18:41:48 +02:00
|
|
|
|
2023-10-23 13:49:54 +02:00
|
|
|
let deladd_obkv = KvReaderDelAdd::new(value);
|
|
|
|
if let Some(value) = deladd_obkv.get(DelAdd::Deletion) {
|
|
|
|
let geopoint = extract_geo_point(value, docid);
|
|
|
|
rtree.remove(&geopoint);
|
|
|
|
geo_faceted_docids.remove(docid);
|
|
|
|
}
|
|
|
|
if let Some(value) = deladd_obkv.get(DelAdd::Addition) {
|
|
|
|
let geopoint = extract_geo_point(value, docid);
|
|
|
|
rtree.insert(geopoint);
|
|
|
|
geo_faceted_docids.insert(docid);
|
|
|
|
}
|
2021-08-23 18:41:48 +02:00
|
|
|
}
|
|
|
|
index.put_geo_rtree(wtxn, &rtree)?;
|
2021-09-09 12:20:08 +02:00
|
|
|
index.put_geo_faceted_documents_ids(wtxn, &geo_faceted_docids)?;
|
2021-08-23 18:41:48 +02:00
|
|
|
}
|
2023-06-08 11:51:55 +02:00
|
|
|
TypedChunk::VectorPoints(vector_points) => {
|
2023-07-25 12:36:01 +02:00
|
|
|
let (pids, mut points): (Vec<_>, Vec<_>) = match index.vector_hnsw(wtxn)? {
|
|
|
|
Some(hnsw) => hnsw.iter().map(|(pid, point)| (pid, point.clone())).unzip(),
|
|
|
|
None => Default::default(),
|
2023-06-14 16:34:09 +02:00
|
|
|
};
|
|
|
|
|
2023-07-25 12:36:01 +02:00
|
|
|
// Convert the PointIds into DocumentIds
|
|
|
|
let mut docids = Vec::new();
|
|
|
|
for pid in pids {
|
|
|
|
let docid =
|
|
|
|
index.vector_id_docid.get(wtxn, &BEU32::new(pid.into_inner()))?.unwrap();
|
|
|
|
docids.push(docid.get());
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut expected_dimensions = points.get(0).map(|p| p.len());
|
2023-06-08 12:19:06 +02:00
|
|
|
let mut cursor = vector_points.into_cursor()?;
|
|
|
|
while let Some((key, value)) = cursor.move_on_next()? {
|
|
|
|
// convert the key back to a u32 (4 bytes)
|
2023-06-20 11:17:20 +02:00
|
|
|
let (left, _index) = try_split_array_at(key).unwrap();
|
|
|
|
let docid = DocumentId::from_be_bytes(left);
|
2023-06-14 14:20:05 +02:00
|
|
|
// convert the vector back to a Vec<f32>
|
|
|
|
let vector: Vec<f32> = pod_collect_to_vec(value);
|
2023-06-14 16:34:09 +02:00
|
|
|
|
2023-06-27 12:30:44 +02:00
|
|
|
// TODO Inform the user about the document that has a wrong `_vectors`
|
2023-06-14 16:34:09 +02:00
|
|
|
let found = vector.len();
|
|
|
|
let expected = *expected_dimensions.get_or_insert(found);
|
|
|
|
if expected != found {
|
2023-10-19 10:47:00 +02:00
|
|
|
return Err(UserError::InvalidVectorDimensions { expected, found }.into());
|
2023-06-14 16:34:09 +02:00
|
|
|
}
|
|
|
|
|
2023-07-25 12:36:01 +02:00
|
|
|
points.push(NDotProductPoint::new(vector));
|
|
|
|
docids.push(docid);
|
2023-06-08 12:19:06 +02:00
|
|
|
}
|
2023-07-25 12:36:01 +02:00
|
|
|
|
|
|
|
assert_eq!(docids.len(), points.len());
|
|
|
|
|
|
|
|
let hnsw_length = points.len();
|
|
|
|
let (new_hnsw, pids) = Hnsw::builder().build_hnsw(points);
|
|
|
|
|
|
|
|
index.vector_id_docid.clear(wtxn)?;
|
|
|
|
for (docid, pid) in docids.into_iter().zip(pids) {
|
|
|
|
index.vector_id_docid.put(
|
|
|
|
wtxn,
|
|
|
|
&BEU32::new(pid.into_inner()),
|
|
|
|
&BEU32::new(docid),
|
|
|
|
)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
log::debug!("There are {} entries in the HNSW so far", hnsw_length);
|
|
|
|
index.put_vector_hnsw(wtxn, &new_hnsw)?;
|
2023-06-08 11:51:55 +02:00
|
|
|
}
|
2023-10-19 10:22:39 +02:00
|
|
|
TypedChunk::ScriptLanguageDocids(sl_map) => {
|
|
|
|
for (key, (deletion, addition)) in sl_map {
|
|
|
|
let mut db_key_exists = false;
|
|
|
|
let final_value = match index.script_language_docids.get(wtxn, &key)? {
|
|
|
|
Some(db_values) => {
|
|
|
|
db_key_exists = true;
|
|
|
|
(db_values - deletion) | addition
|
|
|
|
}
|
|
|
|
None => addition,
|
|
|
|
};
|
|
|
|
|
|
|
|
if final_value.is_empty() {
|
|
|
|
// If the database entry exists, delete it.
|
|
|
|
if db_key_exists == true {
|
2023-10-16 14:58:11 +02:00
|
|
|
index.script_language_docids.delete(wtxn, &key)?;
|
2022-10-12 13:24:56 +02:00
|
|
|
}
|
2023-10-19 10:22:39 +02:00
|
|
|
} else {
|
|
|
|
index.script_language_docids.put(wtxn, &key, &final_value)?;
|
2023-10-16 14:58:11 +02:00
|
|
|
}
|
|
|
|
}
|
2022-10-17 13:51:04 +02:00
|
|
|
}
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
|
|
|
|
2021-08-17 10:56:06 +02:00
|
|
|
Ok((RoaringBitmap::new(), is_merged_database))
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
|
|
|
|
2023-10-23 13:49:54 +02:00
|
|
|
/// Converts the latitude and longitude back to an xyz GeoPoint.
|
|
|
|
fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint {
|
|
|
|
let (lat, tail) = helpers::try_split_array_at::<u8, 8>(value).unwrap();
|
|
|
|
let (lng, _) = helpers::try_split_array_at::<u8, 8>(tail).unwrap();
|
|
|
|
let point = [f64::from_ne_bytes(lat), f64::from_ne_bytes(lng)];
|
|
|
|
let xyz_point = lat_lng_to_xyz(&point);
|
|
|
|
GeoPoint::new(xyz_point, (docid, point))
|
|
|
|
}
|
|
|
|
|
2022-03-24 15:22:57 +01:00
|
|
|
fn merge_word_docids_reader_into_fst(
|
|
|
|
word_docids_iter: grenad::Reader<io::Cursor<ClonableMmap>>,
|
|
|
|
exact_word_docids_iter: grenad::Reader<io::Cursor<ClonableMmap>>,
|
|
|
|
) -> Result<fst::Set<Vec<u8>>> {
|
2022-04-05 18:44:35 +02:00
|
|
|
let mut merger_builder = MergerBuilder::new(merge_ignore_values as MergeFn);
|
2022-03-24 15:22:57 +01:00
|
|
|
merger_builder.push(word_docids_iter.into_cursor()?);
|
|
|
|
merger_builder.push(exact_word_docids_iter.into_cursor()?);
|
|
|
|
let mut iter = merger_builder.build().into_stream_merger_iter()?;
|
|
|
|
let mut builder = fst::SetBuilder::memory();
|
|
|
|
|
|
|
|
while let Some((k, _)) = iter.next()? {
|
|
|
|
builder.insert(k)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(builder.into_set())
|
|
|
|
}
|
|
|
|
|
2023-10-19 10:47:00 +02:00
|
|
|
/// A function that extracts and returns the Add side of a DelAdd obkv.
|
|
|
|
/// This is useful when there are no previous value in the database and
|
|
|
|
/// therefore we don't need to do a diff with what's already there.
|
|
|
|
///
|
|
|
|
/// If there is no Add side we currently write an empty buffer
|
|
|
|
/// which is a valid CboRoaringBitmap.
|
|
|
|
fn deladd_serialize_add_side<'a>(obkv: &'a [u8], _buffer: &mut Vec<u8>) -> Result<&'a [u8]> {
|
|
|
|
Ok(KvReaderDelAdd::new(obkv).get(DelAdd::Addition).unwrap_or_default())
|
|
|
|
}
|
|
|
|
|
2023-10-19 11:18:30 +02:00
|
|
|
/// A function that merges a DelAdd of bitmao into an already existing bitmap.
|
|
|
|
///
|
|
|
|
/// The first argument is the DelAdd obkv of CboRoaringBitmaps and
|
|
|
|
/// the second one is the CboRoaringBitmap to merge into.
|
2023-10-26 18:06:41 +02:00
|
|
|
fn merge_deladd_cbo_roaring_bitmaps<'a>(
|
2023-10-19 11:18:30 +02:00
|
|
|
deladd_obkv: &[u8],
|
|
|
|
previous: &[u8],
|
2023-10-26 18:06:41 +02:00
|
|
|
buffer: &'a mut Vec<u8>,
|
|
|
|
) -> Result<Option<&'a [u8]>> {
|
2023-10-19 11:18:30 +02:00
|
|
|
Ok(CboRoaringBitmapCodec::merge_deladd_into(
|
|
|
|
KvReaderDelAdd::new(deladd_obkv),
|
|
|
|
previous,
|
|
|
|
buffer,
|
|
|
|
)?)
|
|
|
|
}
|
|
|
|
|
2021-08-16 13:36:30 +02:00
|
|
|
/// Write provided entries in database using serialize_value function.
|
|
|
|
/// merge_values function is used if an entry already exist in the database.
|
|
|
|
fn write_entries_into_database<R, K, V, FS, FM>(
|
2022-02-16 15:28:48 +01:00
|
|
|
data: grenad::Reader<R>,
|
2021-08-16 13:36:30 +02:00
|
|
|
database: &heed::Database<K, V>,
|
|
|
|
wtxn: &mut RwTxn,
|
|
|
|
index_is_empty: bool,
|
|
|
|
serialize_value: FS,
|
|
|
|
merge_values: FM,
|
|
|
|
) -> Result<()>
|
|
|
|
where
|
2022-02-16 15:28:48 +01:00
|
|
|
R: io::Read + io::Seek,
|
2021-08-16 13:36:30 +02:00
|
|
|
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
|
2023-10-26 18:06:41 +02:00
|
|
|
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
|
2021-08-16 13:36:30 +02:00
|
|
|
{
|
2023-07-10 18:41:54 +02:00
|
|
|
puffin::profile_function!(format!("number of entries: {}", data.len()));
|
|
|
|
|
2021-08-16 13:36:30 +02:00
|
|
|
let mut buffer = Vec::new();
|
|
|
|
let database = database.remap_types::<ByteSlice, ByteSlice>();
|
|
|
|
|
2022-02-16 15:28:48 +01:00
|
|
|
let mut cursor = data.into_cursor()?;
|
|
|
|
while let Some((key, value)) = cursor.move_on_next()? {
|
2021-08-16 13:36:30 +02:00
|
|
|
if valid_lmdb_key(key) {
|
|
|
|
buffer.clear();
|
|
|
|
let value = if index_is_empty {
|
2023-10-26 18:06:41 +02:00
|
|
|
Some(serialize_value(value, &mut buffer)?)
|
2021-08-16 13:36:30 +02:00
|
|
|
} else {
|
|
|
|
match database.get(wtxn, key)? {
|
2023-10-26 18:06:41 +02:00
|
|
|
Some(prev_value) => merge_values(value, prev_value, &mut buffer)?,
|
|
|
|
None => Some(serialize_value(value, &mut buffer)?),
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
|
|
|
};
|
2023-10-26 18:06:41 +02:00
|
|
|
match value {
|
|
|
|
Some(value) => database.put(wtxn, key, value)?,
|
|
|
|
None => {
|
|
|
|
database.delete(wtxn, key)?;
|
|
|
|
}
|
|
|
|
}
|
2021-08-16 13:36:30 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Write provided entries in database using serialize_value function.
|
|
|
|
/// merge_values function is used if an entry already exist in the database.
|
|
|
|
/// All provided entries must be ordered.
|
|
|
|
/// If the index is not empty, write_entries_into_database is called instead.
|
|
|
|
fn append_entries_into_database<R, K, V, FS, FM>(
|
2022-02-16 15:28:48 +01:00
|
|
|
data: grenad::Reader<R>,
|
2021-08-16 13:36:30 +02:00
|
|
|
database: &heed::Database<K, V>,
|
|
|
|
wtxn: &mut RwTxn,
|
|
|
|
index_is_empty: bool,
|
|
|
|
serialize_value: FS,
|
|
|
|
merge_values: FM,
|
|
|
|
) -> Result<()>
|
|
|
|
where
|
2022-02-16 15:28:48 +01:00
|
|
|
R: io::Read + io::Seek,
|
2021-08-16 13:36:30 +02:00
|
|
|
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
|
2023-10-26 18:06:41 +02:00
|
|
|
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
|
2023-09-19 14:12:43 +02:00
|
|
|
K: for<'a> heed::BytesDecode<'a>,
|
2021-08-16 13:36:30 +02:00
|
|
|
{
|
2023-07-10 18:41:54 +02:00
|
|
|
puffin::profile_function!(format!("number of entries: {}", data.len()));
|
|
|
|
|
2021-08-16 13:36:30 +02:00
|
|
|
if !index_is_empty {
|
|
|
|
return write_entries_into_database(
|
|
|
|
data,
|
|
|
|
database,
|
|
|
|
wtxn,
|
|
|
|
false,
|
|
|
|
serialize_value,
|
|
|
|
merge_values,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut buffer = Vec::new();
|
|
|
|
let mut database = database.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
|
|
|
|
|
2022-02-16 15:28:48 +01:00
|
|
|
let mut cursor = data.into_cursor()?;
|
|
|
|
while let Some((key, value)) = cursor.move_on_next()? {
|
2021-08-16 13:36:30 +02:00
|
|
|
if valid_lmdb_key(key) {
|
2023-09-19 14:12:43 +02:00
|
|
|
debug_assert!(
|
2023-10-19 10:38:58 +02:00
|
|
|
K::bytes_decode(key).is_some(),
|
2023-09-19 14:12:43 +02:00
|
|
|
"Couldn't decode key with the database decoder, key length: {} - key bytes: {:x?}",
|
|
|
|
key.len(),
|
|
|
|
&key
|
|
|
|
);
|
2021-08-16 13:36:30 +02:00
|
|
|
buffer.clear();
|
|
|
|
let value = serialize_value(value, &mut buffer)?;
|
|
|
|
unsafe { database.append(key, value)? };
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|