Write the inverted indexes in memory and never on disk

This commit is contained in:
Clément Renault 2024-09-25 18:13:19 +02:00
parent 3d244451df
commit 15bf556291
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
10 changed files with 245 additions and 393 deletions

View File

@ -1,16 +1,13 @@
use std::fs::File;
use std::marker::PhantomData; use std::marker::PhantomData;
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use grenad::Merger;
use heed::types::Bytes; use heed::types::Bytes;
use memmap2::Mmap; use memmap2::Mmap;
use super::extract::FacetKind; use super::extract::{FacetKind, HashMapMerger};
use super::StdResult; use super::StdResult;
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY}; use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY};
use crate::update::new::KvReaderFieldId; use crate::update::new::KvReaderFieldId;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{DocumentId, Index}; use crate::{DocumentId, Index};
/// The capacity of the channel is currently in number of messages. /// The capacity of the channel is currently in number of messages.
@ -279,7 +276,7 @@ pub trait DatabaseType {
} }
pub trait MergerOperationType { pub trait MergerOperationType {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation; fn new_merger_operation(merger: HashMapMerger) -> MergerOperation;
} }
impl DatabaseType for ExactWordDocids { impl DatabaseType for ExactWordDocids {
@ -287,7 +284,7 @@ impl DatabaseType for ExactWordDocids {
} }
impl MergerOperationType for ExactWordDocids { impl MergerOperationType for ExactWordDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: HashMapMerger) -> MergerOperation {
MergerOperation::ExactWordDocidsMerger(merger) MergerOperation::ExactWordDocidsMerger(merger)
} }
} }
@ -297,7 +294,7 @@ impl DatabaseType for FidWordCountDocids {
} }
impl MergerOperationType for FidWordCountDocids { impl MergerOperationType for FidWordCountDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: HashMapMerger) -> MergerOperation {
MergerOperation::FidWordCountDocidsMerger(merger) MergerOperation::FidWordCountDocidsMerger(merger)
} }
} }
@ -307,7 +304,7 @@ impl DatabaseType for WordDocids {
} }
impl MergerOperationType for WordDocids { impl MergerOperationType for WordDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: HashMapMerger) -> MergerOperation {
MergerOperation::WordDocidsMerger(merger) MergerOperation::WordDocidsMerger(merger)
} }
} }
@ -317,7 +314,7 @@ impl DatabaseType for WordFidDocids {
} }
impl MergerOperationType for WordFidDocids { impl MergerOperationType for WordFidDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: HashMapMerger) -> MergerOperation {
MergerOperation::WordFidDocidsMerger(merger) MergerOperation::WordFidDocidsMerger(merger)
} }
} }
@ -327,7 +324,7 @@ impl DatabaseType for WordPairProximityDocids {
} }
impl MergerOperationType for WordPairProximityDocids { impl MergerOperationType for WordPairProximityDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: HashMapMerger) -> MergerOperation {
MergerOperation::WordPairProximityDocidsMerger(merger) MergerOperation::WordPairProximityDocidsMerger(merger)
} }
} }
@ -337,13 +334,13 @@ impl DatabaseType for WordPositionDocids {
} }
impl MergerOperationType for WordPositionDocids { impl MergerOperationType for WordPositionDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: HashMapMerger) -> MergerOperation {
MergerOperation::WordPositionDocidsMerger(merger) MergerOperation::WordPositionDocidsMerger(merger)
} }
} }
impl MergerOperationType for FacetDocids { impl MergerOperationType for FacetDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: HashMapMerger) -> MergerOperation {
MergerOperation::FacetDocidsMerger(merger) MergerOperation::FacetDocidsMerger(merger)
} }
} }
@ -442,13 +439,13 @@ impl DocumentsSender<'_> {
} }
pub enum MergerOperation { pub enum MergerOperation {
ExactWordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), ExactWordDocidsMerger(HashMapMerger),
FidWordCountDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), FidWordCountDocidsMerger(HashMapMerger),
WordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), WordDocidsMerger(HashMapMerger),
WordFidDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), WordFidDocidsMerger(HashMapMerger),
WordPairProximityDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), WordPairProximityDocidsMerger(HashMapMerger),
WordPositionDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), WordPositionDocidsMerger(HashMapMerger),
FacetDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), FacetDocidsMerger(HashMapMerger),
DeleteDocument { docid: DocumentId }, DeleteDocument { docid: DocumentId },
InsertDocument { docid: DocumentId, document: Box<KvReaderFieldId> }, InsertDocument { docid: DocumentId, document: Box<KvReaderFieldId> },
FinishedDocument, FinishedDocument,
@ -474,7 +471,7 @@ impl ExtractorSender {
pub fn send_searchable<D: MergerOperationType>( pub fn send_searchable<D: MergerOperationType>(
&self, &self,
merger: Merger<File, MergeDeladdCboRoaringBitmaps>, merger: HashMapMerger,
) -> StdResult<(), SendError<()>> { ) -> StdResult<(), SendError<()>> {
match self.0.send(D::new_merger_operation(merger)) { match self.0.send(D::new_merger_operation(merger)) {
Ok(()) => Ok(()), Ok(()) => Ok(()),

View File

@ -1,183 +1,96 @@
use std::mem; use std::collections::HashMap;
use std::num::NonZeroUsize;
use grenad::{MergeFunction, Sorter};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use smallvec::SmallVec; use smallvec::SmallVec;
use super::lru::Lru; pub const KEY_SIZE: usize = 12;
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::CboRoaringBitmapCodec;
const KEY_SIZE: usize = 12;
#[derive(Debug)] #[derive(Debug)]
pub struct CboCachedSorter<MF> { pub struct CboCachedSorter {
cache: Lru<SmallVec<[u8; KEY_SIZE]>, DelAddRoaringBitmap>, cache: HashMap<SmallVec<[u8; KEY_SIZE]>, DelAddRoaringBitmap>,
sorter: Sorter<MF>,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
total_insertions: usize, total_insertions: usize,
fitted_in_key: usize, fitted_in_key: usize,
} }
impl<MF> CboCachedSorter<MF> { impl CboCachedSorter {
pub fn new(cap: NonZeroUsize, sorter: Sorter<MF>) -> Self { pub fn new() -> Self {
CboCachedSorter { CboCachedSorter { cache: HashMap::new(), total_insertions: 0, fitted_in_key: 0 }
cache: Lru::new(cap),
sorter,
deladd_buffer: Vec::new(),
cbo_buffer: Vec::new(),
total_insertions: 0,
fitted_in_key: 0,
}
} }
} }
impl<MF: MergeFunction> CboCachedSorter<MF> { impl CboCachedSorter {
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { pub fn insert_del_u32(&mut self, key: &[u8], n: u32) {
match self.cache.get_mut(key) { match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del, add: _ }) => { Some(DelAddRoaringBitmap { del, add: _ }) => {
del.get_or_insert_with(PushOptimizedBitmap::default).insert(n); del.get_or_insert_with(RoaringBitmap::default).insert(n);
} }
None => { None => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_del_u32(n); let value = DelAddRoaringBitmap::new_del_u32(n);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { assert!(self.cache.insert(key.into(), value).is_none());
self.write_entry(key, deladd)?;
}
} }
} }
Ok(())
} }
pub fn insert_del( pub fn insert_del(&mut self, key: &[u8], bitmap: RoaringBitmap) {
&mut self,
key: &[u8],
bitmap: RoaringBitmap,
) -> grenad::Result<(), MF::Error> {
match self.cache.get_mut(key) { match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del, add: _ }) => { Some(DelAddRoaringBitmap { del, add: _ }) => {
del.get_or_insert_with(PushOptimizedBitmap::default).union_with_bitmap(bitmap); *del.get_or_insert_with(RoaringBitmap::default) |= bitmap;
} }
None => { None => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_del(bitmap); let value = DelAddRoaringBitmap::new_del(bitmap);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { assert!(self.cache.insert(key.into(), value).is_none());
self.write_entry(key, deladd)?;
}
} }
} }
Ok(())
} }
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { pub fn insert_add_u32(&mut self, key: &[u8], n: u32) {
match self.cache.get_mut(key) { match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del: _, add }) => { Some(DelAddRoaringBitmap { del: _, add }) => {
add.get_or_insert_with(PushOptimizedBitmap::default).insert(n); add.get_or_insert_with(RoaringBitmap::default).insert(n);
} }
None => { None => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_add_u32(n); let value = DelAddRoaringBitmap::new_add_u32(n);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { assert!(self.cache.insert(key.into(), value).is_none());
self.write_entry(key, deladd)?;
}
} }
} }
Ok(())
} }
pub fn insert_add( pub fn insert_add(&mut self, key: &[u8], bitmap: RoaringBitmap) {
&mut self,
key: &[u8],
bitmap: RoaringBitmap,
) -> grenad::Result<(), MF::Error> {
match self.cache.get_mut(key) { match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del: _, add }) => { Some(DelAddRoaringBitmap { del: _, add }) => {
add.get_or_insert_with(PushOptimizedBitmap::default).union_with_bitmap(bitmap); *add.get_or_insert_with(RoaringBitmap::default) |= bitmap;
} }
None => { None => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_add(bitmap); let value = DelAddRoaringBitmap::new_add(bitmap);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { assert!(self.cache.insert(key.into(), value).is_none());
self.write_entry(key, deladd)?;
}
} }
} }
Ok(())
} }
pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> grenad::Result<(), MF::Error> { pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) {
match self.cache.get_mut(key) { match self.cache.get_mut(key) {
Some(DelAddRoaringBitmap { del, add }) => { Some(DelAddRoaringBitmap { del, add }) => {
del.get_or_insert_with(PushOptimizedBitmap::default).insert(n); del.get_or_insert_with(RoaringBitmap::default).insert(n);
add.get_or_insert_with(PushOptimizedBitmap::default).insert(n); add.get_or_insert_with(RoaringBitmap::default).insert(n);
} }
None => { None => {
self.total_insertions += 1; self.total_insertions += 1;
self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; self.fitted_in_key += (key.len() <= KEY_SIZE) as usize;
let value = DelAddRoaringBitmap::new_del_add_u32(n); let value = DelAddRoaringBitmap::new_del_add_u32(n);
if let Some((key, deladd)) = self.cache.push(key.into(), value) { assert!(self.cache.insert(key.into(), value).is_none());
self.write_entry(key, deladd)?;
}
} }
} }
Ok(())
} }
fn write_entry<A: AsRef<[u8]>>( pub fn into_sorter(self) -> HashMap<SmallVec<[u8; KEY_SIZE]>, DelAddRoaringBitmap> {
&mut self,
key: A,
deladd: DelAddRoaringBitmap,
) -> grenad::Result<(), MF::Error> {
/// TODO we must create a serialization trait to correctly serialize bitmaps
self.deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer);
match deladd {
DelAddRoaringBitmap { del: Some(del), add: None } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del.bitmap, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: Some(add) } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add.bitmap, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
}
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del.bitmap, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add.bitmap, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
}
let bytes = value_writer.into_inner().unwrap();
self.sorter.insert(key, bytes)
}
pub fn direct_insert(&mut self, key: &[u8], val: &[u8]) -> grenad::Result<(), MF::Error> {
self.sorter.insert(key, val)
}
pub fn into_sorter(mut self) -> grenad::Result<Sorter<MF>, MF::Error> {
let default_arc = Lru::new(NonZeroUsize::MIN);
for (key, deladd) in mem::replace(&mut self.cache, default_arc) {
self.write_entry(key, deladd)?;
}
eprintln!( eprintln!(
"LruCache stats: {} <= {KEY_SIZE} bytes ({}%) on a total of {} insertions", "LruCache stats: {} <= {KEY_SIZE} bytes ({}%) on a total of {} insertions",
self.fitted_in_key, self.fitted_in_key,
@ -185,66 +98,52 @@ impl<MF: MergeFunction> CboCachedSorter<MF> {
self.total_insertions, self.total_insertions,
); );
Ok(self.sorter) self.cache
} }
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone, Default)]
pub struct DelAddRoaringBitmap { pub struct DelAddRoaringBitmap {
pub(crate) del: Option<PushOptimizedBitmap>, pub(crate) del: Option<RoaringBitmap>,
pub(crate) add: Option<PushOptimizedBitmap>, pub(crate) add: Option<RoaringBitmap>,
} }
impl DelAddRoaringBitmap { impl DelAddRoaringBitmap {
fn new_del_add_u32(n: u32) -> Self { fn new_del_add_u32(n: u32) -> Self {
DelAddRoaringBitmap { DelAddRoaringBitmap {
del: Some(PushOptimizedBitmap::from_single(n)), del: Some(RoaringBitmap::from([n])),
add: Some(PushOptimizedBitmap::from_single(n)), add: Some(RoaringBitmap::from([n])),
} }
} }
fn new_del(bitmap: RoaringBitmap) -> Self { fn new_del(bitmap: RoaringBitmap) -> Self {
DelAddRoaringBitmap { del: Some(PushOptimizedBitmap::from_bitmap(bitmap)), add: None } DelAddRoaringBitmap { del: Some(bitmap), add: None }
} }
fn new_del_u32(n: u32) -> Self { fn new_del_u32(n: u32) -> Self {
DelAddRoaringBitmap { del: Some(PushOptimizedBitmap::from_single(n)), add: None } DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: None }
} }
fn new_add(bitmap: RoaringBitmap) -> Self { fn new_add(bitmap: RoaringBitmap) -> Self {
DelAddRoaringBitmap { del: None, add: Some(PushOptimizedBitmap::from_bitmap(bitmap)) } DelAddRoaringBitmap { del: None, add: Some(bitmap) }
} }
fn new_add_u32(n: u32) -> Self { fn new_add_u32(n: u32) -> Self {
DelAddRoaringBitmap { del: None, add: Some(PushOptimizedBitmap::from_single(n)) } DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) }
} }
}
pub fn merge_with(&mut self, other: &DelAddRoaringBitmap) {
#[derive(Debug, Clone, Default)] self.del = match (&self.del, &other.del) {
struct PushOptimizedBitmap { (None, None) => None,
bitmap: RoaringBitmap, (None, Some(other)) => Some(other.clone()),
} (Some(this), None) => Some(this.clone()),
(Some(this), Some(other)) => Some(this | other),
impl PushOptimizedBitmap { };
#[inline] self.add = match (&self.add, &other.add) {
fn from_bitmap(bitmap: RoaringBitmap) -> PushOptimizedBitmap { (None, None) => None,
PushOptimizedBitmap { bitmap } (None, Some(other)) => Some(other.clone()),
} (Some(this), None) => Some(this.clone()),
(Some(this), Some(other)) => Some(this | other),
#[inline] };
fn from_single(single: u32) -> PushOptimizedBitmap {
PushOptimizedBitmap { bitmap: RoaringBitmap::from([single]) }
}
#[inline]
fn insert(&mut self, n: u32) {
if !self.bitmap.push(n) {
self.bitmap.insert(n);
}
}
#[inline]
fn union_with_bitmap(&mut self, bitmap: RoaringBitmap) {
self.bitmap |= bitmap;
} }
} }

View File

@ -1,8 +1,5 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug;
use std::fs::File;
use grenad::{MergeFunction, Merger};
use heed::RoTxn; use heed::RoTxn;
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use serde_json::Value; use serde_json::Value;
@ -11,9 +8,9 @@ use super::super::cache::CboCachedSorter;
use super::facet_document::extract_document_facets; use super::facet_document::extract_document_facets;
use super::FacetKind; use super::FacetKind;
use crate::facet::value_encoding::f64_into_bytes; use crate::facet::value_encoding::f64_into_bytes;
use crate::update::new::extract::DocidsExtractor; use crate::update::new::extract::{DocidsExtractor, HashMapMerger};
use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::new::{DocumentChange, ItemsPool};
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::GrenadParameters;
use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH}; use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedDocidsExtractor; pub struct FacetedDocidsExtractor;
@ -24,7 +21,7 @@ impl FacetedDocidsExtractor {
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
fields_ids_map: &mut GlobalFieldsIdsMap, fields_ids_map: &mut GlobalFieldsIdsMap,
attributes_to_extract: &[&str], attributes_to_extract: &[&str],
cached_sorter: &mut CboCachedSorter<MergeDeladdCboRoaringBitmaps>, cached_sorter: &mut CboCachedSorter,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {
match document_change { match document_change {
@ -94,25 +91,20 @@ impl FacetedDocidsExtractor {
} }
} }
fn facet_fn_with_options<MF>( fn facet_fn_with_options(
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
cached_sorter: &mut CboCachedSorter<MF>, cached_sorter: &mut CboCachedSorter,
cache_fn: impl Fn(&mut CboCachedSorter<MF>, &[u8], u32) -> grenad::Result<(), MF::Error>, cache_fn: impl Fn(&mut CboCachedSorter, &[u8], u32),
docid: DocumentId, docid: DocumentId,
fid: FieldId, fid: FieldId,
value: &Value, value: &Value,
) -> Result<()> ) -> Result<()> {
where
MF: MergeFunction,
MF::Error: Debug,
grenad::Error<MF::Error>: Into<crate::Error>,
{
// Exists // Exists
// key: fid // key: fid
buffer.clear(); buffer.clear();
buffer.push(FacetKind::Exists as u8); buffer.push(FacetKind::Exists as u8);
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)?; cache_fn(cached_sorter, &*buffer, docid);
match value { match value {
// Number // Number
@ -128,7 +120,7 @@ impl FacetedDocidsExtractor {
buffer.extend_from_slice(&ordered); buffer.extend_from_slice(&ordered);
buffer.extend_from_slice(&n.to_be_bytes()); buffer.extend_from_slice(&n.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) Ok(cache_fn(cached_sorter, &*buffer, docid))
} else { } else {
Ok(()) Ok(())
} }
@ -142,7 +134,7 @@ impl FacetedDocidsExtractor {
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(1); // level 0 buffer.push(1); // level 0
buffer.extend_from_slice(truncated.as_bytes()); buffer.extend_from_slice(truncated.as_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) Ok(cache_fn(cached_sorter, &*buffer, docid))
} }
// Null // Null
// key: fid // key: fid
@ -150,7 +142,7 @@ impl FacetedDocidsExtractor {
buffer.clear(); buffer.clear();
buffer.push(FacetKind::Null as u8); buffer.push(FacetKind::Null as u8);
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) Ok(cache_fn(cached_sorter, &*buffer, docid))
} }
// Empty // Empty
// key: fid // key: fid
@ -158,13 +150,13 @@ impl FacetedDocidsExtractor {
buffer.clear(); buffer.clear();
buffer.push(FacetKind::Empty as u8); buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) Ok(cache_fn(cached_sorter, &*buffer, docid))
} }
Value::Object(o) if o.is_empty() => { Value::Object(o) if o.is_empty() => {
buffer.clear(); buffer.clear();
buffer.push(FacetKind::Empty as u8); buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) Ok(cache_fn(cached_sorter, &*buffer, docid))
} }
// Otherwise, do nothing // Otherwise, do nothing
/// TODO: What about Value::Bool? /// TODO: What about Value::Bool?
@ -196,7 +188,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
fields_ids_map: &GlobalFieldsIdsMap, fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters, indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { ) -> Result<HashMapMerger> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
@ -205,23 +197,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
attributes_to_extract.iter().map(|s| s.as_ref()).collect(); attributes_to_extract.iter().map(|s| s.as_ref()).collect();
let context_pool = ItemsPool::new(|| { let context_pool = ItemsPool::new(|| {
Ok(( Ok((index.read_txn()?, fields_ids_map.clone(), Vec::new(), CboCachedSorter::new()))
index.read_txn()?,
fields_ids_map.clone(),
Vec::new(),
CboCachedSorter::new(
// TODO use a better value
100.try_into().unwrap(),
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
),
))
}); });
{ {
@ -243,7 +219,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
})?; })?;
} }
{ {
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); let mut builder = HashMapMerger::new();
let span = let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter(); let _entered = span.enter();
@ -252,14 +228,13 @@ impl DocidsExtractor for FacetedDocidsExtractor {
.into_items() .into_items()
.par_bridge() .par_bridge()
.map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| {
let sorter = cached_sorter.into_sorter()?; cached_sorter.into_sorter()
sorter.into_reader_cursors()
}) })
.collect(); .collect();
for reader in readers {
builder.extend(reader?); builder.extend(readers);
}
Ok(builder.build()) Ok(builder)
} }
} }
} }

View File

@ -3,12 +3,15 @@ mod faceted;
mod lru; mod lru;
mod searchable; mod searchable;
use std::fs::File; use std::collections::HashMap;
use std::mem;
pub use faceted::*; pub use faceted::*;
use grenad::Merger; use grenad::MergeFunction;
use rayon::iter::IntoParallelIterator; use rayon::iter::IntoParallelIterator;
use rayon::slice::ParallelSliceMut as _;
pub use searchable::*; pub use searchable::*;
use smallvec::SmallVec;
use super::DocumentChange; use super::DocumentChange;
use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
@ -20,7 +23,67 @@ pub trait DocidsExtractor {
fields_ids_map: &GlobalFieldsIdsMap, fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters, indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>; ) -> Result<HashMapMerger>;
}
pub struct HashMapMerger {
maps: Vec<HashMap<SmallVec<[u8; cache::KEY_SIZE]>, cache::DelAddRoaringBitmap>>,
}
impl HashMapMerger {
pub fn new() -> HashMapMerger {
HashMapMerger { maps: Vec::new() }
}
pub fn extend<I>(&mut self, iter: I)
where
I: IntoIterator<
Item = HashMap<SmallVec<[u8; cache::KEY_SIZE]>, cache::DelAddRoaringBitmap>,
>,
{
self.maps.extend(iter);
}
pub fn iter<'h>(&'h self) -> Iter<'h> {
let mut entries: Vec<_> =
self.maps.iter().map(|m| m.iter()).flatten().map(|(k, v)| (k.as_slice(), v)).collect();
entries.par_sort_unstable_by_key(|(key, _)| *key);
Iter {
sorted_entries: entries.into_iter(),
current_key: None,
current_deladd: cache::DelAddRoaringBitmap::default(),
}
}
}
pub struct Iter<'h> {
sorted_entries: std::vec::IntoIter<(&'h [u8], &'h cache::DelAddRoaringBitmap)>,
current_key: Option<&'h [u8]>,
current_deladd: cache::DelAddRoaringBitmap,
}
impl<'h> Iterator for Iter<'h> {
type Item = (&'h [u8], cache::DelAddRoaringBitmap);
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.sorted_entries.next() {
Some((k, other)) => {
if self.current_key == Some(k) {
self.current_deladd.merge_with(other);
} else {
let previous_key = self.current_key.replace(k);
let previous_deladd = mem::replace(&mut self.current_deladd, other.clone());
return previous_key.map(|ck| (ck, previous_deladd));
}
}
None => {
let current_deladd = mem::take(&mut self.current_deladd);
return self.current_key.map(|ck| (ck, current_deladd));
}
}
}
}
} }
/// TODO move in permissive json pointer /// TODO move in permissive json pointer

View File

@ -31,7 +31,7 @@ impl SearchableExtractor for FidWordCountDocidsExtractor {
index: &Index, index: &Index,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
fields_ids_map: &mut GlobalFieldsIdsMap, fields_ids_map: &mut GlobalFieldsIdsMap,
cached_sorter: &mut CboCachedSorter<MergeDeladdCboRoaringBitmaps>, cached_sorter: &mut CboCachedSorter,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {
let mut key_buffer = Vec::new(); let mut key_buffer = Vec::new();
@ -52,7 +52,7 @@ impl SearchableExtractor for FidWordCountDocidsExtractor {
for (fid, count) in fid_word_count.iter() { for (fid, count) in fid_word_count.iter() {
if *count <= MAX_COUNTED_WORDS { if *count <= MAX_COUNTED_WORDS {
let key = build_key(*fid, *count as u8, &mut key_buffer); let key = build_key(*fid, *count as u8, &mut key_buffer);
cached_sorter.insert_del_u32(key, inner.docid())?; cached_sorter.insert_del_u32(key, inner.docid());
} }
} }
} }
@ -85,11 +85,11 @@ impl SearchableExtractor for FidWordCountDocidsExtractor {
if *current_count != *new_count { if *current_count != *new_count {
if *current_count <= MAX_COUNTED_WORDS { if *current_count <= MAX_COUNTED_WORDS {
let key = build_key(*fid, *current_count as u8, &mut key_buffer); let key = build_key(*fid, *current_count as u8, &mut key_buffer);
cached_sorter.insert_del_u32(key, inner.docid())?; cached_sorter.insert_del_u32(key, inner.docid());
} }
if *new_count <= MAX_COUNTED_WORDS { if *new_count <= MAX_COUNTED_WORDS {
let key = build_key(*fid, *new_count as u8, &mut key_buffer); let key = build_key(*fid, *new_count as u8, &mut key_buffer);
cached_sorter.insert_add_u32(key, inner.docid())?; cached_sorter.insert_add_u32(key, inner.docid());
} }
} }
} }
@ -106,7 +106,7 @@ impl SearchableExtractor for FidWordCountDocidsExtractor {
for (fid, count) in fid_word_count.iter() { for (fid, count) in fid_word_count.iter() {
if *count <= MAX_COUNTED_WORDS { if *count <= MAX_COUNTED_WORDS {
let key = build_key(*fid, *count as u8, &mut key_buffer); let key = build_key(*fid, *count as u8, &mut key_buffer);
cached_sorter.insert_add_u32(key, inner.docid())?; cached_sorter.insert_add_u32(key, inner.docid());
} }
} }
} }

View File

@ -11,6 +11,7 @@ use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::SearchableExtractor; use super::SearchableExtractor;
use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::extract::HashMapMerger;
use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::new::{DocumentChange, ItemsPool};
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{ use crate::{
@ -39,14 +40,14 @@ where
index: &Index, index: &Index,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
fields_ids_map: &mut GlobalFieldsIdsMap, fields_ids_map: &mut GlobalFieldsIdsMap,
cached_sorter: &mut CboCachedSorter<MergeDeladdCboRoaringBitmaps>, cached_sorter: &mut CboCachedSorter,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {
match document_change { match document_change {
DocumentChange::Deletion(inner) => { DocumentChange::Deletion(inner) => {
let mut token_fn = |_fname: &str, fid, pos, word: &str| { let mut token_fn = |_fname: &str, fid, pos, word: &str| {
let key = Self::build_key(fid, pos, word); let key = Self::build_key(fid, pos, word);
cached_sorter.insert_del_u32(&key, inner.docid()).map_err(crate::Error::from) Ok(cached_sorter.insert_del_u32(&key, inner.docid()))
}; };
document_tokenizer.tokenize_document( document_tokenizer.tokenize_document(
inner.current(rtxn, index)?.unwrap(), inner.current(rtxn, index)?.unwrap(),
@ -57,7 +58,7 @@ where
DocumentChange::Update(inner) => { DocumentChange::Update(inner) => {
let mut token_fn = |_fname: &str, fid, pos, word: &str| { let mut token_fn = |_fname: &str, fid, pos, word: &str| {
let key = Self::build_key(fid, pos, word); let key = Self::build_key(fid, pos, word);
cached_sorter.insert_del_u32(&key, inner.docid()).map_err(crate::Error::from) Ok(cached_sorter.insert_del_u32(&key, inner.docid()))
}; };
document_tokenizer.tokenize_document( document_tokenizer.tokenize_document(
inner.current(rtxn, index)?.unwrap(), inner.current(rtxn, index)?.unwrap(),
@ -67,14 +68,14 @@ where
let mut token_fn = |_fname: &str, fid, pos, word: &str| { let mut token_fn = |_fname: &str, fid, pos, word: &str| {
let key = Self::build_key(fid, pos, word); let key = Self::build_key(fid, pos, word);
cached_sorter.insert_add_u32(&key, inner.docid()).map_err(crate::Error::from) Ok(cached_sorter.insert_add_u32(&key, inner.docid()))
}; };
document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?; document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?;
} }
DocumentChange::Insertion(inner) => { DocumentChange::Insertion(inner) => {
let mut token_fn = |_fname: &str, fid, pos, word: &str| { let mut token_fn = |_fname: &str, fid, pos, word: &str| {
let key = Self::build_key(fid, pos, word); let key = Self::build_key(fid, pos, word);
cached_sorter.insert_add_u32(&key, inner.docid()).map_err(crate::Error::from) Ok(cached_sorter.insert_add_u32(&key, inner.docid()))
}; };
document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?; document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?;
} }
@ -193,11 +194,11 @@ impl ProtoWordDocidsExtractor for WordPositionDocidsExtractor {
// V2 // V2
struct WordDocidsCachedSorters { struct WordDocidsCachedSorters {
word_fid_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, word_fid_docids: CboCachedSorter,
word_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, word_docids: CboCachedSorter,
exact_word_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, exact_word_docids: CboCachedSorter,
word_position_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, word_position_docids: CboCachedSorter,
fid_word_count_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>, fid_word_count_docids: CboCachedSorter,
fid_word_count: HashMap<FieldId, (usize, usize)>, fid_word_count: HashMap<FieldId, (usize, usize)>,
current_docid: Option<DocumentId>, current_docid: Option<DocumentId>,
} }
@ -210,61 +211,11 @@ impl WordDocidsCachedSorters {
) -> Self { ) -> Self {
let max_memory = max_memory.map(|max_memory| max_memory / 4); let max_memory = max_memory.map(|max_memory| max_memory / 4);
let word_fid_docids = CboCachedSorter::new( let word_fid_docids = CboCachedSorter::new();
capacity, let word_docids = CboCachedSorter::new();
create_sorter( let exact_word_docids = CboCachedSorter::new();
grenad::SortAlgorithm::Stable, let word_position_docids = CboCachedSorter::new();
MergeDeladdCboRoaringBitmaps, let fid_word_count_docids = CboCachedSorter::new();
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
let word_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
let exact_word_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
let word_position_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
let fid_word_count_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
Self { Self {
word_fid_docids, word_fid_docids,
@ -288,22 +239,22 @@ impl WordDocidsCachedSorters {
) -> Result<()> { ) -> Result<()> {
let key = word.as_bytes(); let key = word.as_bytes();
if exact { if exact {
self.exact_word_docids.insert_add_u32(key, docid)?; self.exact_word_docids.insert_add_u32(key, docid);
} else { } else {
self.word_docids.insert_add_u32(key, docid)?; self.word_docids.insert_add_u32(key, docid);
} }
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word.as_bytes());
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&position.to_be_bytes()); buffer.extend_from_slice(&position.to_be_bytes());
self.word_fid_docids.insert_add_u32(buffer, docid)?; self.word_fid_docids.insert_add_u32(buffer, docid);
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word.as_bytes());
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&field_id.to_be_bytes()); buffer.extend_from_slice(&field_id.to_be_bytes());
self.word_position_docids.insert_add_u32(buffer, docid)?; self.word_position_docids.insert_add_u32(buffer, docid);
if self.current_docid.map_or(false, |id| docid != id) { if self.current_docid.map_or(false, |id| docid != id) {
self.flush_fid_word_count(buffer)?; self.flush_fid_word_count(buffer)?;
@ -329,22 +280,22 @@ impl WordDocidsCachedSorters {
) -> Result<()> { ) -> Result<()> {
let key = word.as_bytes(); let key = word.as_bytes();
if exact { if exact {
self.exact_word_docids.insert_del_u32(key, docid)?; self.exact_word_docids.insert_del_u32(key, docid);
} else { } else {
self.word_docids.insert_del_u32(key, docid)?; self.word_docids.insert_del_u32(key, docid);
} }
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word.as_bytes());
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&position.to_be_bytes()); buffer.extend_from_slice(&position.to_be_bytes());
self.word_fid_docids.insert_del_u32(buffer, docid)?; self.word_fid_docids.insert_del_u32(buffer, docid);
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word.as_bytes());
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&field_id.to_be_bytes()); buffer.extend_from_slice(&field_id.to_be_bytes());
self.word_position_docids.insert_del_u32(buffer, docid)?; self.word_position_docids.insert_del_u32(buffer, docid);
if self.current_docid.map_or(false, |id| docid != id) { if self.current_docid.map_or(false, |id| docid != id) {
self.flush_fid_word_count(buffer)?; self.flush_fid_word_count(buffer)?;
@ -366,15 +317,13 @@ impl WordDocidsCachedSorters {
buffer.clear(); buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(current_count as u8); buffer.push(current_count as u8);
self.fid_word_count_docids self.fid_word_count_docids.insert_del_u32(buffer, self.current_docid.unwrap());
.insert_del_u32(buffer, self.current_docid.unwrap())?;
} }
if new_count <= MAX_COUNTED_WORDS { if new_count <= MAX_COUNTED_WORDS {
buffer.clear(); buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(new_count as u8); buffer.push(new_count as u8);
self.fid_word_count_docids self.fid_word_count_docids.insert_add_u32(buffer, self.current_docid.unwrap());
.insert_add_u32(buffer, self.current_docid.unwrap())?;
} }
} }
} }
@ -384,29 +333,29 @@ impl WordDocidsCachedSorters {
} }
struct WordDocidsMergerBuilders { struct WordDocidsMergerBuilders {
word_fid_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>, word_fid_docids: HashMapMerger,
word_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>, word_docids: HashMapMerger,
exact_word_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>, exact_word_docids: HashMapMerger,
word_position_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>, word_position_docids: HashMapMerger,
fid_word_count_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>, fid_word_count_docids: HashMapMerger,
} }
pub struct WordDocidsMergers { pub struct WordDocidsMergers {
pub word_fid_docids: Merger<File, MergeDeladdCboRoaringBitmaps>, pub word_fid_docids: HashMapMerger,
pub word_docids: Merger<File, MergeDeladdCboRoaringBitmaps>, pub word_docids: HashMapMerger,
pub exact_word_docids: Merger<File, MergeDeladdCboRoaringBitmaps>, pub exact_word_docids: HashMapMerger,
pub word_position_docids: Merger<File, MergeDeladdCboRoaringBitmaps>, pub word_position_docids: HashMapMerger,
pub fid_word_count_docids: Merger<File, MergeDeladdCboRoaringBitmaps>, pub fid_word_count_docids: HashMapMerger,
} }
impl WordDocidsMergerBuilders { impl WordDocidsMergerBuilders {
fn new() -> Self { fn new() -> Self {
Self { Self {
word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_fid_docids: HashMapMerger::new(),
word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_docids: HashMapMerger::new(),
exact_word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), exact_word_docids: HashMapMerger::new(),
word_position_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), word_position_docids: HashMapMerger::new(),
fid_word_count_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps), fid_word_count_docids: HashMapMerger::new(),
} }
} }
@ -421,49 +370,44 @@ impl WordDocidsMergerBuilders {
current_docid: _, current_docid: _,
} = other; } = other;
let mut word_fid_docids_readers = Ok(vec![]); let mut word_fid_docids_readers = HashMap::new();
let mut word_docids_readers = Ok(vec![]); let mut word_docids_readers = HashMap::new();
let mut exact_word_docids_readers = Ok(vec![]); let mut exact_word_docids_readers = HashMap::new();
let mut word_position_docids_readers = Ok(vec![]); let mut word_position_docids_readers = HashMap::new();
let mut fid_word_count_docids_readers = Ok(vec![]); let mut fid_word_count_docids_readers = HashMap::new();
rayon::scope(|s| { rayon::scope(|s| {
s.spawn(|_| { s.spawn(|_| {
word_fid_docids_readers = word_fid_docids_readers = word_fid_docids.into_sorter();
word_fid_docids.into_sorter().and_then(|s| s.into_reader_cursors());
}); });
s.spawn(|_| { s.spawn(|_| {
word_docids_readers = word_docids_readers = word_docids.into_sorter();
word_docids.into_sorter().and_then(|s| s.into_reader_cursors());
}); });
s.spawn(|_| { s.spawn(|_| {
exact_word_docids_readers = exact_word_docids_readers = exact_word_docids.into_sorter();
exact_word_docids.into_sorter().and_then(|s| s.into_reader_cursors());
}); });
s.spawn(|_| { s.spawn(|_| {
word_position_docids_readers = word_position_docids_readers = word_position_docids.into_sorter();
word_position_docids.into_sorter().and_then(|s| s.into_reader_cursors());
}); });
s.spawn(|_| { s.spawn(|_| {
fid_word_count_docids_readers = fid_word_count_docids_readers = fid_word_count_docids.into_sorter();
fid_word_count_docids.into_sorter().and_then(|s| s.into_reader_cursors());
}); });
}); });
self.word_fid_docids.extend(word_fid_docids_readers?); self.word_fid_docids.extend([word_fid_docids_readers]);
self.word_docids.extend(word_docids_readers?); self.word_docids.extend([word_docids_readers]);
self.exact_word_docids.extend(exact_word_docids_readers?); self.exact_word_docids.extend([exact_word_docids_readers]);
self.word_position_docids.extend(word_position_docids_readers?); self.word_position_docids.extend([word_position_docids_readers]);
self.fid_word_count_docids.extend(fid_word_count_docids_readers?); self.fid_word_count_docids.extend([fid_word_count_docids_readers]);
Ok(()) Ok(())
} }
fn build(self) -> WordDocidsMergers { fn build(self) -> WordDocidsMergers {
WordDocidsMergers { WordDocidsMergers {
word_fid_docids: self.word_fid_docids.build(), word_fid_docids: self.word_fid_docids,
word_docids: self.word_docids.build(), word_docids: self.word_docids,
exact_word_docids: self.exact_word_docids.build(), exact_word_docids: self.exact_word_docids,
word_position_docids: self.word_position_docids.build(), word_position_docids: self.word_position_docids,
fid_word_count_docids: self.fid_word_count_docids.build(), fid_word_count_docids: self.fid_word_count_docids,
} }
} }
} }

View File

@ -2,7 +2,6 @@ use std::collections::VecDeque;
use std::rc::Rc; use std::rc::Rc;
use heed::RoTxn; use heed::RoTxn;
use itertools::merge_join_by;
use obkv::KvReader; use obkv::KvReader;
use super::tokenize_document::DocumentTokenizer; use super::tokenize_document::DocumentTokenizer;
@ -10,7 +9,6 @@ use super::SearchableExtractor;
use crate::proximity::{index_proximity, MAX_DISTANCE}; use crate::proximity::{index_proximity, MAX_DISTANCE};
use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{FieldId, GlobalFieldsIdsMap, Index, Result}; use crate::{FieldId, GlobalFieldsIdsMap, Index, Result};
pub struct WordPairProximityDocidsExtractor; pub struct WordPairProximityDocidsExtractor;
@ -33,7 +31,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
index: &Index, index: &Index,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
fields_ids_map: &mut GlobalFieldsIdsMap, fields_ids_map: &mut GlobalFieldsIdsMap,
cached_sorter: &mut CboCachedSorter<MergeDeladdCboRoaringBitmaps>, cached_sorter: &mut CboCachedSorter,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()> { ) -> Result<()> {
let mut key_buffer = Vec::new(); let mut key_buffer = Vec::new();
@ -96,14 +94,14 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
del_word_pair_proximity.dedup_by(|(k1, _), (k2, _)| k1 == k2); del_word_pair_proximity.dedup_by(|(k1, _), (k2, _)| k1 == k2);
for ((w1, w2), prox) in del_word_pair_proximity.iter() { for ((w1, w2), prox) in del_word_pair_proximity.iter() {
let key = build_key(*prox, w1, w2, &mut key_buffer); let key = build_key(*prox, w1, w2, &mut key_buffer);
cached_sorter.insert_del_u32(key, docid)?; cached_sorter.insert_del_u32(key, docid);
} }
add_word_pair_proximity.sort_unstable(); add_word_pair_proximity.sort_unstable();
add_word_pair_proximity.dedup_by(|(k1, _), (k2, _)| k1 == k2); add_word_pair_proximity.dedup_by(|(k1, _), (k2, _)| k1 == k2);
for ((w1, w2), prox) in add_word_pair_proximity.iter() { for ((w1, w2), prox) in add_word_pair_proximity.iter() {
let key = build_key(*prox, w1, w2, &mut key_buffer); let key = build_key(*prox, w1, w2, &mut key_buffer);
cached_sorter.insert_add_u32(key, docid)?; cached_sorter.insert_add_u32(key, docid);
} }
Ok(()) Ok(())
} }

View File

@ -13,9 +13,9 @@ use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter; use super::cache::CboCachedSorter;
use super::DocidsExtractor; use super::{DocidsExtractor, HashMapMerger};
use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::new::{DocumentChange, ItemsPool};
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
pub trait SearchableExtractor { pub trait SearchableExtractor {
@ -24,7 +24,7 @@ pub trait SearchableExtractor {
fields_ids_map: &GlobalFieldsIdsMap, fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters, indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { ) -> Result<HashMapMerger> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
@ -60,18 +60,7 @@ pub trait SearchableExtractor {
index.read_txn()?, index.read_txn()?,
&document_tokenizer, &document_tokenizer,
fields_ids_map.clone(), fields_ids_map.clone(),
CboCachedSorter::new( CboCachedSorter::new(),
// TODO use a better value
1_000_000.try_into().unwrap(),
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
),
)) ))
}); });
@ -93,7 +82,7 @@ pub trait SearchableExtractor {
})?; })?;
} }
{ {
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); let mut builder = HashMapMerger::new();
let span = let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter(); let _entered = span.enter();
@ -102,14 +91,11 @@ pub trait SearchableExtractor {
.into_items() .into_items()
.par_bridge() .par_bridge()
.map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| {
let sorter = cached_sorter.into_sorter()?; cached_sorter.into_sorter()
sorter.into_reader_cursors()
}) })
.collect(); .collect();
for reader in readers { builder.extend(readers);
builder.extend(reader?); Ok(builder)
}
Ok(builder.build())
} }
} }
@ -118,7 +104,7 @@ pub trait SearchableExtractor {
index: &Index, index: &Index,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
fields_ids_map: &mut GlobalFieldsIdsMap, fields_ids_map: &mut GlobalFieldsIdsMap,
cached_sorter: &mut CboCachedSorter<MergeDeladdCboRoaringBitmaps>, cached_sorter: &mut CboCachedSorter,
document_change: DocumentChange, document_change: DocumentChange,
) -> Result<()>; ) -> Result<()>;
@ -134,7 +120,7 @@ impl<T: SearchableExtractor> DocidsExtractor for T {
fields_ids_map: &GlobalFieldsIdsMap, fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters, indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { ) -> Result<HashMapMerger> {
Self::run_extraction(index, fields_ids_map, indexer, document_changes) Self::run_extraction(index, fields_ids_map, indexer, document_changes)
} }
} }

View File

@ -2,7 +2,7 @@ use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::sync::Arc; use std::sync::Arc;
use heed::types::{Bytes, DecodeIgnore}; use heed::types::Bytes;
use heed::RoTxn; use heed::RoTxn;
use memmap2::Mmap; use memmap2::Mmap;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};

View File

@ -8,7 +8,7 @@ use heed::{Database, RoTxn};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::channel::*; use super::channel::*;
use super::extract::FacetKind; use super::extract::{FacetKind, HashMapMerger};
use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update}; use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update};
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation; use crate::update::new::channel::MergerOperation;
@ -238,21 +238,16 @@ impl GeoExtractor {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_docids( fn merge_and_send_docids(
merger: Merger<File, MergeDeladdCboRoaringBitmaps>, merger: HashMapMerger,
database: Database<Bytes, Bytes>, database: Database<Bytes, Bytes>,
rtxn: &RoTxn<'_>, rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender, docids_sender: impl DocidsSender,
mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>, mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>,
) -> Result<()> { ) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap(); for (key, deladd) in merger.iter() {
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get(rtxn, key)?; let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into(); match merge_cbo_bitmaps(current, deladd.del, deladd.add)? {
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
docids_sender.write(key, value).unwrap(); docids_sender.write(key, value).unwrap();
@ -271,20 +266,15 @@ fn merge_and_send_docids(
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_facet_docids( fn merge_and_send_facet_docids(
merger: Merger<File, MergeDeladdCboRoaringBitmaps>, merger: HashMapMerger,
database: FacetDatabases, database: FacetDatabases,
rtxn: &RoTxn<'_>, rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender, docids_sender: impl DocidsSender,
) -> Result<()> { ) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap(); for (key, deladd) in merger.iter() {
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get(rtxn, key)?; let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into(); match merge_cbo_bitmaps(current, deladd.del, deladd.add)? {
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
docids_sender.write(key, value).unwrap(); docids_sender.write(key, value).unwrap();
@ -348,12 +338,12 @@ enum Operation {
/// A function that merges the DelAdd CboRoaringBitmaps with the current bitmap. /// A function that merges the DelAdd CboRoaringBitmaps with the current bitmap.
fn merge_cbo_bitmaps( fn merge_cbo_bitmaps(
current: Option<&[u8]>, current: Option<&[u8]>,
del: Option<&[u8]>, del: Option<RoaringBitmap>,
add: Option<&[u8]>, add: Option<RoaringBitmap>,
) -> Result<Operation> { ) -> Result<Operation> {
let current = current.map(CboRoaringBitmapCodec::deserialize_from).transpose()?; let current = current.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
let del = del.map(CboRoaringBitmapCodec::deserialize_from).transpose()?; // let del = del.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
let add = add.map(CboRoaringBitmapCodec::deserialize_from).transpose()?; // let add = add.map(CboRoaringBitmapCodec::deserialize_from).transpose()?;
match (current, del, add) { match (current, del, add) {
(None, None, None) => Ok(Operation::Ignore), // but it's strange (None, None, None) => Ok(Operation::Ignore), // but it's strange