mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-07 12:04:30 +01:00
Merge pull request #4141 from meilisearch/diff-indexing-searchable
Diff indexing searchable
This commit is contained in:
commit
066221fd2b
@ -98,3 +98,7 @@ pub fn del_add_from_two_obkvs<K: obkv::Key + PartialOrd + Ord>(
|
|||||||
|
|
||||||
writer.finish()
|
writer.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd) -> bool {
|
||||||
|
del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition)
|
||||||
|
}
|
||||||
|
@ -13,7 +13,7 @@ use crate::error::{InternalError, SerializationError};
|
|||||||
use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd};
|
use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd};
|
||||||
use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH};
|
use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH};
|
||||||
|
|
||||||
pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), RoaringBitmap>;
|
pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>;
|
||||||
|
|
||||||
/// Extracts the word and positions where this word appear and
|
/// Extracts the word and positions where this word appear and
|
||||||
/// prefixes it by the document id.
|
/// prefixes it by the document id.
|
||||||
@ -29,8 +29,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
|||||||
allowed_separators: Option<&[&str]>,
|
allowed_separators: Option<&[&str]>,
|
||||||
dictionary: Option<&[&str]>,
|
dictionary: Option<&[&str]>,
|
||||||
max_positions_per_attributes: Option<u32>,
|
max_positions_per_attributes: Option<u32>,
|
||||||
) -> Result<(RoaringBitmap, grenad::Reader<File>, (ScriptLanguageDocidsMap, ScriptLanguageDocidsMap))>
|
) -> Result<(RoaringBitmap, grenad::Reader<File>, ScriptLanguageDocidsMap)> {
|
||||||
{
|
|
||||||
puffin::profile_function!();
|
puffin::profile_function!();
|
||||||
|
|
||||||
let max_positions_per_attributes = max_positions_per_attributes
|
let max_positions_per_attributes = max_positions_per_attributes
|
||||||
@ -39,8 +38,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
|||||||
|
|
||||||
// initialize destination values.
|
// initialize destination values.
|
||||||
let mut documents_ids = RoaringBitmap::new();
|
let mut documents_ids = RoaringBitmap::new();
|
||||||
let mut del_script_language_docids = HashMap::new();
|
let mut script_language_docids = HashMap::new();
|
||||||
let mut add_script_language_docids = HashMap::new();
|
|
||||||
let mut docid_word_positions_sorter = create_sorter(
|
let mut docid_word_positions_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Stable,
|
grenad::SortAlgorithm::Stable,
|
||||||
keep_latest_obkv,
|
keep_latest_obkv,
|
||||||
@ -134,25 +132,24 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
|||||||
// update script_language_docids deletions.
|
// update script_language_docids deletions.
|
||||||
for (script, languages_frequency) in del_script_language_word_count {
|
for (script, languages_frequency) in del_script_language_word_count {
|
||||||
for (language, _) in languages_frequency {
|
for (language, _) in languages_frequency {
|
||||||
let entry = del_script_language_docids
|
let entry = script_language_docids
|
||||||
.entry((script, language))
|
.entry((script, language))
|
||||||
.or_insert_with(RoaringBitmap::new);
|
.or_insert_with(|| (RoaringBitmap::new(), RoaringBitmap::new()));
|
||||||
entry.push(document_id);
|
entry.0.push(document_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// update script_language_docids additions.
|
// update script_language_docids additions.
|
||||||
for (script, languages_frequency) in add_script_language_word_count {
|
for (script, languages_frequency) in add_script_language_word_count {
|
||||||
for (language, _) in languages_frequency {
|
for (language, _) in languages_frequency {
|
||||||
let entry = add_script_language_docids
|
let entry = script_language_docids
|
||||||
.entry((script, language))
|
.entry((script, language))
|
||||||
.or_insert_with(RoaringBitmap::new);
|
.or_insert_with(|| (RoaringBitmap::new(), RoaringBitmap::new()));
|
||||||
entry.push(document_id);
|
entry.1.push(document_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let script_language_docids = (del_script_language_docids, add_script_language_docids);
|
|
||||||
sorter_into_reader(docid_word_positions_sorter, indexer)
|
sorter_into_reader(docid_word_positions_sorter, indexer)
|
||||||
.map(|reader| (documents_ids, reader, script_language_docids))
|
.map(|reader| (documents_ids, reader, script_language_docids))
|
||||||
}
|
}
|
||||||
|
@ -6,12 +6,13 @@ use heed::BytesDecode;
|
|||||||
use obkv::KvReaderU16;
|
use obkv::KvReaderU16;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, create_writer, merge_cbo_roaring_bitmaps, sorter_into_reader,
|
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader,
|
||||||
try_split_array_at, writer_into_reader, GrenadParameters,
|
try_split_array_at, writer_into_reader, GrenadParameters,
|
||||||
};
|
};
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::heed_codec::StrBEU16Codec;
|
use crate::heed_codec::StrBEU16Codec;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
|
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||||
use crate::update::MergeFn;
|
use crate::update::MergeFn;
|
||||||
use crate::{DocumentId, FieldId, Result};
|
use crate::{DocumentId, FieldId, Result};
|
||||||
|
|
||||||
@ -34,14 +35,15 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
|
|
||||||
let mut word_fid_docids_sorter = create_sorter(
|
let mut word_fid_docids_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Unstable,
|
grenad::SortAlgorithm::Unstable,
|
||||||
merge_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory.map(|x| x / 3),
|
max_memory.map(|x| x / 3),
|
||||||
);
|
);
|
||||||
let mut key_buffer = Vec::new();
|
let mut key_buffer = Vec::new();
|
||||||
let mut words = BTreeSet::new();
|
let mut del_words = BTreeSet::new();
|
||||||
|
let mut add_words = BTreeSet::new();
|
||||||
let mut cursor = docid_word_positions.into_cursor()?;
|
let mut cursor = docid_word_positions.into_cursor()?;
|
||||||
while let Some((key, value)) = cursor.move_on_next()? {
|
while let Some((key, value)) = cursor.move_on_next()? {
|
||||||
let (document_id_bytes, fid_bytes) = try_split_array_at(key)
|
let (document_id_bytes, fid_bytes) = try_split_array_at(key)
|
||||||
@ -51,24 +53,37 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
let document_id = u32::from_be_bytes(document_id_bytes);
|
let document_id = u32::from_be_bytes(document_id_bytes);
|
||||||
let fid = u16::from_be_bytes(fid_bytes);
|
let fid = u16::from_be_bytes(fid_bytes);
|
||||||
|
|
||||||
for (_pos, word) in KvReaderU16::new(&value).iter() {
|
let del_add_reader = KvReaderDelAdd::new(&value);
|
||||||
words.insert(word.to_vec());
|
// extract all unique words to remove.
|
||||||
|
if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) {
|
||||||
|
for (_pos, word) in KvReaderU16::new(&deletion).iter() {
|
||||||
|
del_words.insert(word.to_vec());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// extract all unique additional words.
|
||||||
|
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
|
||||||
|
for (_pos, word) in KvReaderU16::new(&addition).iter() {
|
||||||
|
add_words.insert(word.to_vec());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
words_into_sorter(
|
words_into_sorter(
|
||||||
document_id,
|
document_id,
|
||||||
fid,
|
fid,
|
||||||
&mut key_buffer,
|
&mut key_buffer,
|
||||||
&mut words,
|
&del_words,
|
||||||
|
&add_words,
|
||||||
&mut word_fid_docids_sorter,
|
&mut word_fid_docids_sorter,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
words.clear();
|
del_words.clear();
|
||||||
|
add_words.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut word_docids_sorter = create_sorter(
|
let mut word_docids_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Unstable,
|
grenad::SortAlgorithm::Unstable,
|
||||||
merge_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
@ -77,7 +92,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
|
|
||||||
let mut exact_word_docids_sorter = create_sorter(
|
let mut exact_word_docids_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Unstable,
|
grenad::SortAlgorithm::Unstable,
|
||||||
merge_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
@ -91,8 +106,12 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
);
|
);
|
||||||
|
|
||||||
let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
|
let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
|
||||||
|
// TODO: replace sorters by writers by accumulating values into a buffer before inserting them.
|
||||||
while let Some((key, value)) = iter.next()? {
|
while let Some((key, value)) = iter.next()? {
|
||||||
word_fid_docids_writer.insert(key, value)?;
|
// only keep the value if their is a change to apply in the DB.
|
||||||
|
if !is_noop_del_add_obkv(KvReaderDelAdd::new(value)) {
|
||||||
|
word_fid_docids_writer.insert(key, value)?;
|
||||||
|
}
|
||||||
|
|
||||||
let (word, fid) = StrBEU16Codec::bytes_decode(key)
|
let (word, fid) = StrBEU16Codec::bytes_decode(key)
|
||||||
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
||||||
@ -116,20 +135,41 @@ fn words_into_sorter(
|
|||||||
document_id: DocumentId,
|
document_id: DocumentId,
|
||||||
fid: FieldId,
|
fid: FieldId,
|
||||||
key_buffer: &mut Vec<u8>,
|
key_buffer: &mut Vec<u8>,
|
||||||
words: &mut BTreeSet<Vec<u8>>,
|
del_words: &BTreeSet<Vec<u8>>,
|
||||||
|
add_words: &BTreeSet<Vec<u8>>,
|
||||||
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
puffin::profile_function!();
|
puffin::profile_function!();
|
||||||
|
|
||||||
for word_bytes in words.iter() {
|
use itertools::merge_join_by;
|
||||||
|
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||||
|
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) {
|
||||||
|
buffer.clear();
|
||||||
|
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
|
||||||
|
let word_bytes = match eob {
|
||||||
|
Left(word_bytes) => {
|
||||||
|
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||||
|
word_bytes
|
||||||
|
}
|
||||||
|
Right(word_bytes) => {
|
||||||
|
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||||
|
word_bytes
|
||||||
|
}
|
||||||
|
Both(word_bytes, _) => {
|
||||||
|
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||||
|
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||||
|
word_bytes
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
key_buffer.clear();
|
key_buffer.clear();
|
||||||
key_buffer.extend_from_slice(&word_bytes);
|
key_buffer.extend_from_slice(&word_bytes);
|
||||||
key_buffer.push(0);
|
key_buffer.push(0);
|
||||||
key_buffer.extend_from_slice(&fid.to_be_bytes());
|
key_buffer.extend_from_slice(&fid.to_be_bytes());
|
||||||
word_fid_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
|
word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
words.clear();
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,17 @@
|
|||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{BTreeMap, VecDeque};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::{cmp, io};
|
use std::{cmp, io};
|
||||||
|
|
||||||
use obkv::KvReaderU16;
|
use obkv::KvReaderU16;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, create_writer, merge_cbo_roaring_bitmaps, sorter_into_reader,
|
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at,
|
||||||
try_split_array_at, writer_into_reader, GrenadParameters, MergeFn,
|
writer_into_reader, GrenadParameters, MergeFn,
|
||||||
};
|
};
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
use crate::proximity::{index_proximity, MAX_DISTANCE};
|
use crate::proximity::{index_proximity, MAX_DISTANCE};
|
||||||
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||||
use crate::{DocumentId, Result};
|
use crate::{DocumentId, Result};
|
||||||
|
|
||||||
/// Extracts the best proximity between pairs of words and the documents ids where this pair appear.
|
/// Extracts the best proximity between pairs of words and the documents ids where this pair appear.
|
||||||
@ -31,7 +32,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
|||||||
.map(|_| {
|
.map(|_| {
|
||||||
create_sorter(
|
create_sorter(
|
||||||
grenad::SortAlgorithm::Unstable,
|
grenad::SortAlgorithm::Unstable,
|
||||||
merge_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
@ -40,9 +41,12 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut word_positions: VecDeque<(String, u16)> =
|
let mut del_word_positions: VecDeque<(String, u16)> =
|
||||||
VecDeque::with_capacity(MAX_DISTANCE as usize);
|
VecDeque::with_capacity(MAX_DISTANCE as usize);
|
||||||
let mut word_pair_proximity = HashMap::new();
|
let mut add_word_positions: VecDeque<(String, u16)> =
|
||||||
|
VecDeque::with_capacity(MAX_DISTANCE as usize);
|
||||||
|
let mut del_word_pair_proximity = BTreeMap::new();
|
||||||
|
let mut add_word_pair_proximity = BTreeMap::new();
|
||||||
let mut current_document_id = None;
|
let mut current_document_id = None;
|
||||||
|
|
||||||
let mut cursor = docid_word_positions.into_cursor()?;
|
let mut cursor = docid_word_positions.into_cursor()?;
|
||||||
@ -54,50 +58,90 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
|||||||
// if we change document, we fill the sorter
|
// if we change document, we fill the sorter
|
||||||
if current_document_id.map_or(false, |id| id != document_id) {
|
if current_document_id.map_or(false, |id| id != document_id) {
|
||||||
puffin::profile_scope!("Document into sorter");
|
puffin::profile_scope!("Document into sorter");
|
||||||
while !word_positions.is_empty() {
|
|
||||||
word_positions_into_word_pair_proximity(
|
|
||||||
&mut word_positions,
|
|
||||||
&mut word_pair_proximity,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
document_word_positions_into_sorter(
|
document_word_positions_into_sorter(
|
||||||
current_document_id.unwrap(),
|
current_document_id.unwrap(),
|
||||||
&word_pair_proximity,
|
&del_word_pair_proximity,
|
||||||
|
&add_word_pair_proximity,
|
||||||
&mut word_pair_proximity_docids_sorters,
|
&mut word_pair_proximity_docids_sorters,
|
||||||
)?;
|
)?;
|
||||||
word_pair_proximity.clear();
|
del_word_pair_proximity.clear();
|
||||||
word_positions.clear();
|
add_word_pair_proximity.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
current_document_id = Some(document_id);
|
current_document_id = Some(document_id);
|
||||||
|
|
||||||
for (position, word) in KvReaderU16::new(&value).iter() {
|
let (del, add): (Result<_>, Result<_>) = rayon::join(
|
||||||
// drain the proximity window until the head word is considered close to the word we are inserting.
|
|| {
|
||||||
while word_positions.get(0).map_or(false, |(_w, p)| {
|
// deletions
|
||||||
index_proximity(*p as u32, position as u32) >= MAX_DISTANCE
|
if let Some(deletion) = KvReaderDelAdd::new(&value).get(DelAdd::Deletion) {
|
||||||
}) {
|
for (position, word) in KvReaderU16::new(deletion).iter() {
|
||||||
word_positions_into_word_pair_proximity(
|
// drain the proximity window until the head word is considered close to the word we are inserting.
|
||||||
&mut word_positions,
|
while del_word_positions.get(0).map_or(false, |(_w, p)| {
|
||||||
&mut word_pair_proximity,
|
index_proximity(*p as u32, position as u32) >= MAX_DISTANCE
|
||||||
)?;
|
}) {
|
||||||
}
|
word_positions_into_word_pair_proximity(
|
||||||
|
&mut del_word_positions,
|
||||||
|
&mut del_word_pair_proximity,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
// insert the new word.
|
// insert the new word.
|
||||||
let word = std::str::from_utf8(word)?;
|
let word = std::str::from_utf8(word)?;
|
||||||
word_positions.push_back((word.to_string(), position));
|
del_word_positions.push_back((word.to_string(), position));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while !del_word_positions.is_empty() {
|
||||||
|
word_positions_into_word_pair_proximity(
|
||||||
|
&mut del_word_positions,
|
||||||
|
&mut del_word_pair_proximity,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
|| {
|
||||||
|
// additions
|
||||||
|
if let Some(addition) = KvReaderDelAdd::new(&value).get(DelAdd::Addition) {
|
||||||
|
for (position, word) in KvReaderU16::new(addition).iter() {
|
||||||
|
// drain the proximity window until the head word is considered close to the word we are inserting.
|
||||||
|
while add_word_positions.get(0).map_or(false, |(_w, p)| {
|
||||||
|
index_proximity(*p as u32, position as u32) >= MAX_DISTANCE
|
||||||
|
}) {
|
||||||
|
word_positions_into_word_pair_proximity(
|
||||||
|
&mut add_word_positions,
|
||||||
|
&mut add_word_pair_proximity,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// insert the new word.
|
||||||
|
let word = std::str::from_utf8(word)?;
|
||||||
|
add_word_positions.push_back((word.to_string(), position));
|
||||||
|
}
|
||||||
|
|
||||||
|
while !add_word_positions.is_empty() {
|
||||||
|
word_positions_into_word_pair_proximity(
|
||||||
|
&mut add_word_positions,
|
||||||
|
&mut add_word_pair_proximity,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
del?;
|
||||||
|
add?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(document_id) = current_document_id {
|
if let Some(document_id) = current_document_id {
|
||||||
puffin::profile_scope!("Final document into sorter");
|
puffin::profile_scope!("Final document into sorter");
|
||||||
while !word_positions.is_empty() {
|
|
||||||
word_positions_into_word_pair_proximity(&mut word_positions, &mut word_pair_proximity)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
document_word_positions_into_sorter(
|
document_word_positions_into_sorter(
|
||||||
document_id,
|
document_id,
|
||||||
&word_pair_proximity,
|
&del_word_pair_proximity,
|
||||||
|
&add_word_pair_proximity,
|
||||||
&mut word_pair_proximity_docids_sorters,
|
&mut word_pair_proximity_docids_sorters,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
@ -123,11 +167,38 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
|||||||
/// close to each other.
|
/// close to each other.
|
||||||
fn document_word_positions_into_sorter(
|
fn document_word_positions_into_sorter(
|
||||||
document_id: DocumentId,
|
document_id: DocumentId,
|
||||||
word_pair_proximity: &HashMap<(String, String), u8>,
|
del_word_pair_proximity: &BTreeMap<(String, String), u8>,
|
||||||
|
add_word_pair_proximity: &BTreeMap<(String, String), u8>,
|
||||||
word_pair_proximity_docids_sorters: &mut Vec<grenad::Sorter<MergeFn>>,
|
word_pair_proximity_docids_sorters: &mut Vec<grenad::Sorter<MergeFn>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
use itertools::merge_join_by;
|
||||||
|
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||||
|
|
||||||
|
let mut buffer = Vec::new();
|
||||||
let mut key_buffer = Vec::new();
|
let mut key_buffer = Vec::new();
|
||||||
for ((w1, w2), prox) in word_pair_proximity {
|
for eob in
|
||||||
|
merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| {
|
||||||
|
d.cmp(a)
|
||||||
|
})
|
||||||
|
{
|
||||||
|
buffer.clear();
|
||||||
|
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
|
||||||
|
let ((w1, w2), prox) = match eob {
|
||||||
|
Left(key_value) => {
|
||||||
|
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||||
|
key_value
|
||||||
|
}
|
||||||
|
Right(key_value) => {
|
||||||
|
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||||
|
key_value
|
||||||
|
}
|
||||||
|
Both(key_value, _) => {
|
||||||
|
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||||
|
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||||
|
key_value
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
key_buffer.clear();
|
key_buffer.clear();
|
||||||
key_buffer.push(*prox as u8);
|
key_buffer.push(*prox as u8);
|
||||||
key_buffer.extend_from_slice(w1.as_bytes());
|
key_buffer.extend_from_slice(w1.as_bytes());
|
||||||
@ -135,7 +206,7 @@ fn document_word_positions_into_sorter(
|
|||||||
key_buffer.extend_from_slice(w2.as_bytes());
|
key_buffer.extend_from_slice(w2.as_bytes());
|
||||||
|
|
||||||
word_pair_proximity_docids_sorters[*prox as usize - 1]
|
word_pair_proximity_docids_sorters[*prox as usize - 1]
|
||||||
.insert(&key_buffer, document_id.to_ne_bytes())?;
|
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -143,7 +214,7 @@ fn document_word_positions_into_sorter(
|
|||||||
|
|
||||||
fn word_positions_into_word_pair_proximity(
|
fn word_positions_into_word_pair_proximity(
|
||||||
word_positions: &mut VecDeque<(String, u16)>,
|
word_positions: &mut VecDeque<(String, u16)>,
|
||||||
word_pair_proximity: &mut HashMap<(String, String), u8>,
|
word_pair_proximity: &mut BTreeMap<(String, String), u8>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (head_word, head_position) = word_positions.pop_front().unwrap();
|
let (head_word, head_position) = word_positions.pop_front().unwrap();
|
||||||
for (word, position) in word_positions.iter() {
|
for (word, position) in word_positions.iter() {
|
||||||
|
@ -1,15 +1,17 @@
|
|||||||
use std::collections::HashSet;
|
use std::collections::BTreeSet;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use obkv::KvReaderU16;
|
use obkv::KvReaderU16;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
|
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
|
||||||
GrenadParameters,
|
GrenadParameters,
|
||||||
};
|
};
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||||
|
use crate::update::MergeFn;
|
||||||
use crate::{bucketed_position, DocumentId, Result};
|
use crate::{bucketed_position, DocumentId, Result};
|
||||||
|
|
||||||
/// Extracts the word positions and the documents ids where this word appear.
|
/// Extracts the word positions and the documents ids where this word appear.
|
||||||
@ -27,14 +29,15 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
|||||||
|
|
||||||
let mut word_position_docids_sorter = create_sorter(
|
let mut word_position_docids_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Unstable,
|
grenad::SortAlgorithm::Unstable,
|
||||||
merge_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory,
|
max_memory,
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut word_positions: HashSet<(u16, Vec<u8>)> = HashSet::new();
|
let mut del_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
|
||||||
|
let mut add_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
|
||||||
let mut current_document_id: Option<u32> = None;
|
let mut current_document_id: Option<u32> = None;
|
||||||
let mut key_buffer = Vec::new();
|
let mut key_buffer = Vec::new();
|
||||||
let mut cursor = docid_word_positions.into_cursor()?;
|
let mut cursor = docid_word_positions.into_cursor()?;
|
||||||
@ -44,36 +47,92 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
|||||||
let document_id = DocumentId::from_be_bytes(document_id_bytes);
|
let document_id = DocumentId::from_be_bytes(document_id_bytes);
|
||||||
|
|
||||||
if current_document_id.map_or(false, |id| document_id != id) {
|
if current_document_id.map_or(false, |id| document_id != id) {
|
||||||
for (position, word_bytes) in word_positions.iter() {
|
words_position_into_sorter(
|
||||||
key_buffer.clear();
|
current_document_id.unwrap(),
|
||||||
key_buffer.extend_from_slice(word_bytes);
|
&mut key_buffer,
|
||||||
key_buffer.push(0);
|
&del_word_positions,
|
||||||
key_buffer.extend_from_slice(&position.to_be_bytes());
|
&add_word_positions,
|
||||||
word_position_docids_sorter
|
&mut word_position_docids_sorter,
|
||||||
.insert(&key_buffer, current_document_id.unwrap().to_ne_bytes())?;
|
)?;
|
||||||
}
|
del_word_positions.clear();
|
||||||
word_positions.clear();
|
add_word_positions.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
current_document_id = Some(document_id);
|
current_document_id = Some(document_id);
|
||||||
|
|
||||||
for (position, word_bytes) in KvReaderU16::new(&value).iter() {
|
let del_add_reader = KvReaderDelAdd::new(&value);
|
||||||
let position = bucketed_position(position);
|
// extract all unique words to remove.
|
||||||
word_positions.insert((position, word_bytes.to_vec()));
|
if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) {
|
||||||
|
for (position, word_bytes) in KvReaderU16::new(deletion).iter() {
|
||||||
|
let position = bucketed_position(position);
|
||||||
|
del_word_positions.insert((position, word_bytes.to_vec()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// extract all unique additional words.
|
||||||
|
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
|
||||||
|
for (position, word_bytes) in KvReaderU16::new(addition).iter() {
|
||||||
|
let position = bucketed_position(position);
|
||||||
|
add_word_positions.insert((position, word_bytes.to_vec()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(document_id) = current_document_id {
|
if let Some(document_id) = current_document_id {
|
||||||
for (position, word_bytes) in word_positions {
|
words_position_into_sorter(
|
||||||
key_buffer.clear();
|
document_id,
|
||||||
key_buffer.extend_from_slice(&word_bytes);
|
&mut key_buffer,
|
||||||
key_buffer.push(0);
|
&del_word_positions,
|
||||||
key_buffer.extend_from_slice(&position.to_be_bytes());
|
&add_word_positions,
|
||||||
word_position_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
|
&mut word_position_docids_sorter,
|
||||||
}
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO remove noop DelAdd OBKV
|
||||||
let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?;
|
let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?;
|
||||||
|
|
||||||
Ok(word_position_docids_reader)
|
Ok(word_position_docids_reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn words_position_into_sorter(
|
||||||
|
document_id: DocumentId,
|
||||||
|
key_buffer: &mut Vec<u8>,
|
||||||
|
del_word_positions: &BTreeSet<(u16, Vec<u8>)>,
|
||||||
|
add_word_positions: &BTreeSet<(u16, Vec<u8>)>,
|
||||||
|
word_position_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
||||||
|
) -> Result<()> {
|
||||||
|
puffin::profile_function!();
|
||||||
|
|
||||||
|
use itertools::merge_join_by;
|
||||||
|
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||||
|
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a))
|
||||||
|
{
|
||||||
|
buffer.clear();
|
||||||
|
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
|
||||||
|
let (position, word_bytes) = match eob {
|
||||||
|
Left(key) => {
|
||||||
|
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||||
|
key
|
||||||
|
}
|
||||||
|
Right(key) => {
|
||||||
|
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||||
|
key
|
||||||
|
}
|
||||||
|
Both(key, _) => {
|
||||||
|
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||||
|
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||||
|
key
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
key_buffer.clear();
|
||||||
|
key_buffer.extend_from_slice(word_bytes);
|
||||||
|
key_buffer.push(0);
|
||||||
|
key_buffer.extend_from_slice(&position.to_be_bytes());
|
||||||
|
word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
@ -43,9 +43,7 @@ pub(crate) enum TypedChunk {
|
|||||||
FieldIdFacetIsEmptyDocids(grenad::Reader<File>),
|
FieldIdFacetIsEmptyDocids(grenad::Reader<File>),
|
||||||
GeoPoints(grenad::Reader<File>),
|
GeoPoints(grenad::Reader<File>),
|
||||||
VectorPoints(grenad::Reader<File>),
|
VectorPoints(grenad::Reader<File>),
|
||||||
ScriptLanguageDocids(
|
ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>),
|
||||||
(HashMap<(Script, Language), RoaringBitmap>, HashMap<(Script, Language), RoaringBitmap>),
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TypedChunk {
|
impl TypedChunk {
|
||||||
@ -103,8 +101,8 @@ impl TypedChunk {
|
|||||||
TypedChunk::VectorPoints(grenad) => {
|
TypedChunk::VectorPoints(grenad) => {
|
||||||
format!("VectorPoints {{ number_of_entries: {} }}", grenad.len())
|
format!("VectorPoints {{ number_of_entries: {} }}", grenad.len())
|
||||||
}
|
}
|
||||||
TypedChunk::ScriptLanguageDocids((_, addition)) => {
|
TypedChunk::ScriptLanguageDocids(sl_map) => {
|
||||||
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", addition.len())
|
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", sl_map.len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -346,24 +344,25 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
log::debug!("There are {} entries in the HNSW so far", hnsw_length);
|
log::debug!("There are {} entries in the HNSW so far", hnsw_length);
|
||||||
index.put_vector_hnsw(wtxn, &new_hnsw)?;
|
index.put_vector_hnsw(wtxn, &new_hnsw)?;
|
||||||
}
|
}
|
||||||
TypedChunk::ScriptLanguageDocids((deletion, addition)) => {
|
TypedChunk::ScriptLanguageDocids(sl_map) => {
|
||||||
for (key, value) in deletion {
|
for (key, (deletion, addition)) in sl_map {
|
||||||
if let Some(mut db_values) = index.script_language_docids.get(wtxn, &key)? {
|
let mut db_key_exists = false;
|
||||||
db_values -= value;
|
|
||||||
if db_values.is_empty() {
|
|
||||||
index.script_language_docids.delete(wtxn, &key)?;
|
|
||||||
} else {
|
|
||||||
index.script_language_docids.put(wtxn, &key, &db_values)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (key, value) in addition {
|
|
||||||
let final_value = match index.script_language_docids.get(wtxn, &key)? {
|
let final_value = match index.script_language_docids.get(wtxn, &key)? {
|
||||||
Some(mut db_values) => db_values | value,
|
Some(db_values) => {
|
||||||
None => value,
|
db_key_exists = true;
|
||||||
|
(db_values - deletion) | addition
|
||||||
|
}
|
||||||
|
None => addition,
|
||||||
};
|
};
|
||||||
index.script_language_docids.put(wtxn, &key, &final_value)?;
|
|
||||||
|
if final_value.is_empty() {
|
||||||
|
// If the database entry exists, delete it.
|
||||||
|
if db_key_exists == true {
|
||||||
|
index.script_language_docids.delete(wtxn, &key)?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
index.script_language_docids.put(wtxn, &key, &final_value)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -388,13 +387,6 @@ fn merge_word_docids_reader_into_fst(
|
|||||||
Ok(builder.into_set())
|
Ok(builder.into_set())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn merge_roaring_bitmaps(new_value: &[u8], db_value: &[u8], buffer: &mut Vec<u8>) -> Result<()> {
|
|
||||||
let new_value = RoaringBitmap::deserialize_from(new_value)?;
|
|
||||||
let db_value = RoaringBitmap::deserialize_from(db_value)?;
|
|
||||||
let value = new_value | db_value;
|
|
||||||
Ok(serialize_roaring_bitmap(&value, buffer)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn merge_cbo_roaring_bitmaps(
|
fn merge_cbo_roaring_bitmaps(
|
||||||
new_value: &[u8],
|
new_value: &[u8],
|
||||||
db_value: &[u8],
|
db_value: &[u8],
|
||||||
|
Loading…
x
Reference in New Issue
Block a user