mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-30 00:34:26 +01:00
Use the sorter cache in the word docids extractor
This commit is contained in:
parent
5d5769fd8a
commit
8319552e7d
@ -39,7 +39,24 @@ where
|
|||||||
del.get_or_insert_with(RoaringBitmap::new).insert(n);
|
del.get_or_insert_with(RoaringBitmap::new).insert(n);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del(n)) {
|
None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del_u32(n)) {
|
||||||
|
Some((key, deladd)) => self.write_entry_to_sorter(key, deladd),
|
||||||
|
None => Ok(()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert_del(
|
||||||
|
&mut self,
|
||||||
|
key: &[u8],
|
||||||
|
bitmap: RoaringBitmap,
|
||||||
|
) -> Result<(), grenad::Error<U>> {
|
||||||
|
match self.cache.get_mut(key) {
|
||||||
|
Some(DelAddRoaringBitmap { del, add: _ }) => {
|
||||||
|
*del.get_or_insert_with(RoaringBitmap::new) |= bitmap;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del(bitmap)) {
|
||||||
Some((key, deladd)) => self.write_entry_to_sorter(key, deladd),
|
Some((key, deladd)) => self.write_entry_to_sorter(key, deladd),
|
||||||
None => Ok(()),
|
None => Ok(()),
|
||||||
},
|
},
|
||||||
@ -52,7 +69,24 @@ where
|
|||||||
add.get_or_insert_with(RoaringBitmap::new).insert(n);
|
add.get_or_insert_with(RoaringBitmap::new).insert(n);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_add(n)) {
|
None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_add_u32(n)) {
|
||||||
|
Some((key, deladd)) => self.write_entry_to_sorter(key, deladd),
|
||||||
|
None => Ok(()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert_add(
|
||||||
|
&mut self,
|
||||||
|
key: &[u8],
|
||||||
|
bitmap: RoaringBitmap,
|
||||||
|
) -> Result<(), grenad::Error<U>> {
|
||||||
|
match self.cache.get_mut(key) {
|
||||||
|
Some(DelAddRoaringBitmap { del: _, add }) => {
|
||||||
|
*add.get_or_insert_with(RoaringBitmap::new) |= bitmap;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_add(bitmap)) {
|
||||||
Some((key, deladd)) => self.write_entry_to_sorter(key, deladd),
|
Some((key, deladd)) => self.write_entry_to_sorter(key, deladd),
|
||||||
None => Ok(()),
|
None => Ok(()),
|
||||||
},
|
},
|
||||||
@ -66,7 +100,7 @@ where
|
|||||||
add.get_or_insert_with(RoaringBitmap::new).insert(n);
|
add.get_or_insert_with(RoaringBitmap::new).insert(n);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del_add(n)) {
|
None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del_add_u32(n)) {
|
||||||
Some((key, deladd)) => self.write_entry_to_sorter(key, deladd),
|
Some((key, deladd)) => self.write_entry_to_sorter(key, deladd),
|
||||||
None => Ok(()),
|
None => Ok(()),
|
||||||
},
|
},
|
||||||
@ -121,18 +155,30 @@ pub struct DelAddRoaringBitmap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DelAddRoaringBitmap {
|
impl DelAddRoaringBitmap {
|
||||||
fn new_del_add(n: u32) -> Self {
|
fn new_del_add(bitmap: RoaringBitmap) -> Self {
|
||||||
|
DelAddRoaringBitmap { del: Some(bitmap.clone()), add: Some(bitmap) }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_del_add_u32(n: u32) -> Self {
|
||||||
DelAddRoaringBitmap {
|
DelAddRoaringBitmap {
|
||||||
del: Some(RoaringBitmap::from([n])),
|
del: Some(RoaringBitmap::from([n])),
|
||||||
add: Some(RoaringBitmap::from([n])),
|
add: Some(RoaringBitmap::from([n])),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_del(n: u32) -> Self {
|
fn new_del(bitmap: RoaringBitmap) -> Self {
|
||||||
|
DelAddRoaringBitmap { del: Some(bitmap), add: None }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_del_u32(n: u32) -> Self {
|
||||||
DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: None }
|
DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: None }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_add(n: u32) -> Self {
|
fn new_add(bitmap: RoaringBitmap) -> Self {
|
||||||
|
DelAddRoaringBitmap { del: None, add: Some(bitmap) }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_add_u32(n: u32) -> Self {
|
||||||
DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) }
|
DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{self, BufReader};
|
use std::io::{self, BufReader};
|
||||||
|
use std::num::NonZeroUsize;
|
||||||
|
|
||||||
use heed::{BytesDecode, BytesEncode};
|
use heed::{BytesDecode, BytesEncode};
|
||||||
use obkv::KvReaderU16;
|
use obkv::KvReaderU16;
|
||||||
@ -15,6 +16,7 @@ use crate::error::SerializationError;
|
|||||||
use crate::heed_codec::StrBEU16Codec;
|
use crate::heed_codec::StrBEU16Codec;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||||
|
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
|
||||||
use crate::update::index_documents::helpers::sorter_into_reader;
|
use crate::update::index_documents::helpers::sorter_into_reader;
|
||||||
use crate::update::settings::InnerIndexSettingsDiff;
|
use crate::update::settings::InnerIndexSettingsDiff;
|
||||||
use crate::update::MergeFn;
|
use crate::update::MergeFn;
|
||||||
@ -38,9 +40,8 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
grenad::Reader<BufReader<File>>,
|
grenad::Reader<BufReader<File>>,
|
||||||
)> {
|
)> {
|
||||||
let max_memory = indexer.max_memory_by_thread();
|
let max_memory = indexer.max_memory_by_thread();
|
||||||
let mut conn = REDIS_CLIENT.get_connection().unwrap();
|
|
||||||
|
|
||||||
let mut word_fid_docids_sorter = create_sorter(
|
let word_fid_docids_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Unstable,
|
grenad::SortAlgorithm::Unstable,
|
||||||
merge_deladd_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
@ -48,6 +49,12 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory.map(|m| m / 3),
|
max_memory.map(|m| m / 3),
|
||||||
);
|
);
|
||||||
|
let mut cached_word_fid_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, _>::new(
|
||||||
|
NonZeroUsize::new(300).unwrap(),
|
||||||
|
word_fid_docids_sorter,
|
||||||
|
REDIS_CLIENT.get_connection().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
let mut key_buffer = Vec::new();
|
let mut key_buffer = Vec::new();
|
||||||
let mut del_words = BTreeSet::new();
|
let mut del_words = BTreeSet::new();
|
||||||
let mut add_words = BTreeSet::new();
|
let mut add_words = BTreeSet::new();
|
||||||
@ -81,8 +88,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
&mut key_buffer,
|
&mut key_buffer,
|
||||||
&del_words,
|
&del_words,
|
||||||
&add_words,
|
&add_words,
|
||||||
&mut word_fid_docids_sorter,
|
&mut cached_word_fid_docids_sorter,
|
||||||
&mut conn,
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
del_words.clear();
|
del_words.clear();
|
||||||
@ -95,7 +101,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
tempfile::tempfile()?,
|
tempfile::tempfile()?,
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut word_docids_sorter = create_sorter(
|
let word_docids_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Unstable,
|
grenad::SortAlgorithm::Unstable,
|
||||||
merge_deladd_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
@ -103,8 +109,13 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory.map(|m| m / 3),
|
max_memory.map(|m| m / 3),
|
||||||
);
|
);
|
||||||
|
let mut cached_word_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||||
|
NonZeroUsize::new(100).unwrap(),
|
||||||
|
word_docids_sorter,
|
||||||
|
REDIS_CLIENT.get_connection().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
let mut exact_word_docids_sorter = create_sorter(
|
let exact_word_docids_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Unstable,
|
grenad::SortAlgorithm::Unstable,
|
||||||
merge_deladd_cbo_roaring_bitmaps,
|
merge_deladd_cbo_roaring_bitmaps,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
@ -112,9 +123,13 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory.map(|m| m / 3),
|
max_memory.map(|m| m / 3),
|
||||||
);
|
);
|
||||||
|
let mut cached_exact_word_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||||
|
NonZeroUsize::new(100).unwrap(),
|
||||||
|
exact_word_docids_sorter,
|
||||||
|
REDIS_CLIENT.get_connection().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
|
let mut iter = cached_word_fid_docids_sorter.into_sorter()?.into_stream_merger_iter()?;
|
||||||
let mut buffer = Vec::new();
|
|
||||||
// NOTE: replacing sorters by bitmap merging is less efficient, so, use sorters.
|
// NOTE: replacing sorters by bitmap merging is less efficient, so, use sorters.
|
||||||
while let Some((key, value)) = iter.next()? {
|
while let Some((key, value)) = iter.next()? {
|
||||||
// only keep the value if their is a change to apply in the DB.
|
// only keep the value if their is a change to apply in the DB.
|
||||||
@ -127,36 +142,31 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
|
|
||||||
// merge all deletions
|
// merge all deletions
|
||||||
let obkv = KvReaderDelAdd::new(value);
|
let obkv = KvReaderDelAdd::new(value);
|
||||||
|
let w = w.as_bytes();
|
||||||
if let Some(value) = obkv.get(DelAdd::Deletion) {
|
if let Some(value) = obkv.get(DelAdd::Deletion) {
|
||||||
let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid);
|
let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid);
|
||||||
buffer.clear();
|
let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?;
|
||||||
let mut obkv = KvWriterDelAdd::new(&mut buffer);
|
|
||||||
obkv.insert(DelAdd::Deletion, value)?;
|
|
||||||
redis::cmd("INCR").arg(w.as_bytes()).query::<usize>(&mut conn).unwrap();
|
|
||||||
if delete_from_exact {
|
if delete_from_exact {
|
||||||
exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
|
cached_exact_word_docids_sorter.insert_del(w, bitmap)?;
|
||||||
} else {
|
} else {
|
||||||
word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
|
cached_word_docids_sorter.insert_del(w, bitmap)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// merge all additions
|
// merge all additions
|
||||||
if let Some(value) = obkv.get(DelAdd::Addition) {
|
if let Some(value) = obkv.get(DelAdd::Addition) {
|
||||||
let add_in_exact = settings_diff.new.exact_attributes.contains(&fid);
|
let add_in_exact = settings_diff.new.exact_attributes.contains(&fid);
|
||||||
buffer.clear();
|
let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?;
|
||||||
let mut obkv = KvWriterDelAdd::new(&mut buffer);
|
|
||||||
obkv.insert(DelAdd::Addition, value)?;
|
|
||||||
redis::cmd("INCR").arg(w.as_bytes()).query::<usize>(&mut conn).unwrap();
|
|
||||||
if add_in_exact {
|
if add_in_exact {
|
||||||
exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
|
cached_exact_word_docids_sorter.insert_add(w, bitmap)?;
|
||||||
} else {
|
} else {
|
||||||
word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
|
cached_word_docids_sorter.insert_add(w, bitmap)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
sorter_into_reader(word_docids_sorter, indexer)?,
|
sorter_into_reader(cached_word_docids_sorter.into_sorter()?, indexer)?,
|
||||||
sorter_into_reader(exact_word_docids_sorter, indexer)?,
|
sorter_into_reader(cached_exact_word_docids_sorter.into_sorter()?, indexer)?,
|
||||||
writer_into_reader(word_fid_docids_writer)?,
|
writer_into_reader(word_fid_docids_writer)?,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
@ -168,38 +178,34 @@ fn words_into_sorter(
|
|||||||
key_buffer: &mut Vec<u8>,
|
key_buffer: &mut Vec<u8>,
|
||||||
del_words: &BTreeSet<Vec<u8>>,
|
del_words: &BTreeSet<Vec<u8>>,
|
||||||
add_words: &BTreeSet<Vec<u8>>,
|
add_words: &BTreeSet<Vec<u8>>,
|
||||||
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
cached_word_fid_docids_sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
|
||||||
conn: &mut redis::Connection,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
use itertools::merge_join_by;
|
use itertools::merge_join_by;
|
||||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||||
|
|
||||||
let mut buffer = Vec::new();
|
|
||||||
for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) {
|
for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) {
|
||||||
buffer.clear();
|
|
||||||
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
|
|
||||||
let word_bytes = match eob {
|
let word_bytes = match eob {
|
||||||
Left(word_bytes) => {
|
Left(word_bytes) => word_bytes,
|
||||||
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
Right(word_bytes) => word_bytes,
|
||||||
word_bytes
|
Both(word_bytes, _) => word_bytes,
|
||||||
}
|
|
||||||
Right(word_bytes) => {
|
|
||||||
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
|
||||||
word_bytes
|
|
||||||
}
|
|
||||||
Both(word_bytes, _) => {
|
|
||||||
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
|
||||||
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
|
||||||
word_bytes
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
key_buffer.clear();
|
key_buffer.clear();
|
||||||
key_buffer.extend_from_slice(word_bytes);
|
key_buffer.extend_from_slice(word_bytes);
|
||||||
key_buffer.push(0);
|
key_buffer.push(0);
|
||||||
key_buffer.extend_from_slice(&fid.to_be_bytes());
|
key_buffer.extend_from_slice(&fid.to_be_bytes());
|
||||||
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
|
|
||||||
word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
match eob {
|
||||||
|
Left(_) => {
|
||||||
|
cached_word_fid_docids_sorter.insert_del_u32(key_buffer, document_id)?;
|
||||||
|
}
|
||||||
|
Right(_) => {
|
||||||
|
cached_word_fid_docids_sorter.insert_add_u32(key_buffer, document_id)?;
|
||||||
|
}
|
||||||
|
Both(_, _) => {
|
||||||
|
cached_word_fid_docids_sorter.insert_del_add_u32(key_buffer, document_id)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
Loading…
Reference in New Issue
Block a user