diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 9b05c7ce4..98538ea9e 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -6,6 +6,7 @@ use grenad::Merger; use heed::types::Bytes; use memmap2::Mmap; +use super::extract::FacetKind; use super::StdResult; use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY}; use crate::update::new::KvReaderFieldId; @@ -120,11 +121,16 @@ pub enum Database { WordFidDocids, WordPairProximityDocids, WordPositionDocids, + FacetIdIsNullDocids, + FacetIdIsEmptyDocids, + FacetIdExistsDocids, + FacetIdF64NumberDocids, + FacetIdStringDocids, } -impl WriterOperation { +impl Database { pub fn database(&self, index: &Index) -> heed::Database { - match self.database { + match self { Database::Documents => index.documents.remap_types(), Database::ExactWordDocids => index.exact_word_docids.remap_types(), Database::Main => index.main.remap_types(), @@ -133,8 +139,19 @@ impl WriterOperation { Database::WordPositionDocids => index.word_position_docids.remap_types(), Database::FidWordCountDocids => index.field_id_word_count_docids.remap_types(), Database::WordPairProximityDocids => index.word_pair_proximity_docids.remap_types(), + Database::FacetIdIsNullDocids => index.facet_id_is_null_docids.remap_types(), + Database::FacetIdIsEmptyDocids => index.facet_id_is_empty_docids.remap_types(), + Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(), + Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(), + Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(), } } +} + +impl WriterOperation { + pub fn database(&self, index: &Index) -> heed::Database { + self.database.database(index) + } pub fn entry(self) -> EntryOperation { self.entry @@ -159,8 +176,12 @@ impl MergerSender { MainSender(&self.0) } - pub fn docids(&self) -> DocidsSender<'_, D> { - DocidsSender { sender: &self.0, _marker: PhantomData } + pub fn docids(&self) -> WordDocidsSender<'_, D> { + WordDocidsSender { sender: &self.0, _marker: PhantomData } + } + + pub fn facet_docids(&self) -> FacetDocidsSender<'_> { + FacetDocidsSender { sender: &self.0 } } pub fn documents(&self) -> DocumentsSender<'_> { @@ -208,16 +229,21 @@ pub enum WordDocids {} pub enum WordFidDocids {} pub enum WordPairProximityDocids {} pub enum WordPositionDocids {} +pub enum FacetDocids {} pub trait DatabaseType { const DATABASE: Database; +} +pub trait MergerOperationType { fn new_merger_operation(merger: Merger) -> MergerOperation; } impl DatabaseType for ExactWordDocids { const DATABASE: Database = Database::ExactWordDocids; +} +impl MergerOperationType for ExactWordDocids { fn new_merger_operation(merger: Merger) -> MergerOperation { MergerOperation::ExactWordDocidsMerger(merger) } @@ -225,7 +251,9 @@ impl DatabaseType for ExactWordDocids { impl DatabaseType for FidWordCountDocids { const DATABASE: Database = Database::FidWordCountDocids; +} +impl MergerOperationType for FidWordCountDocids { fn new_merger_operation(merger: Merger) -> MergerOperation { MergerOperation::FidWordCountDocidsMerger(merger) } @@ -233,7 +261,9 @@ impl DatabaseType for FidWordCountDocids { impl DatabaseType for WordDocids { const DATABASE: Database = Database::WordDocids; +} +impl MergerOperationType for WordDocids { fn new_merger_operation(merger: Merger) -> MergerOperation { MergerOperation::WordDocidsMerger(merger) } @@ -241,7 +271,9 @@ impl DatabaseType for WordDocids { impl DatabaseType for WordFidDocids { const DATABASE: Database = Database::WordFidDocids; +} +impl MergerOperationType for WordFidDocids { fn new_merger_operation(merger: Merger) -> MergerOperation { MergerOperation::WordFidDocidsMerger(merger) } @@ -249,7 +281,9 @@ impl DatabaseType for WordFidDocids { impl DatabaseType for WordPairProximityDocids { const DATABASE: Database = Database::WordPairProximityDocids; +} +impl MergerOperationType for WordPairProximityDocids { fn new_merger_operation(merger: Merger) -> MergerOperation { MergerOperation::WordPairProximityDocidsMerger(merger) } @@ -257,19 +291,32 @@ impl DatabaseType for WordPairProximityDocids { impl DatabaseType for WordPositionDocids { const DATABASE: Database = Database::WordPositionDocids; +} +impl MergerOperationType for WordPositionDocids { fn new_merger_operation(merger: Merger) -> MergerOperation { MergerOperation::WordPositionDocidsMerger(merger) } } -pub struct DocidsSender<'a, D> { +impl MergerOperationType for FacetDocids { + fn new_merger_operation(merger: Merger) -> MergerOperation { + MergerOperation::FacetDocidsMerger(merger) + } +} + +pub trait DocidsSender { + fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>>; + fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>>; +} + +pub struct WordDocidsSender<'a, D> { sender: &'a Sender, _marker: PhantomData, } -impl DocidsSender<'_, D> { - pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { +impl DocidsSender for WordDocidsSender<'_, D> { + fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); match self.sender.send(WriterOperation { database: D::DATABASE, entry }) { Ok(()) => Ok(()), @@ -277,7 +324,7 @@ impl DocidsSender<'_, D> { } } - pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Delete(KeyEntry::from_key(key)); match self.sender.send(WriterOperation { database: D::DATABASE, entry }) { Ok(()) => Ok(()), @@ -286,6 +333,43 @@ impl DocidsSender<'_, D> { } } +pub struct FacetDocidsSender<'a> { + sender: &'a Sender, +} + +impl DocidsSender for FacetDocidsSender<'_> { + fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { + let (database, key) = self.extract_database(key); + let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); + match self.sender.send(WriterOperation { database, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + let (database, key) = self.extract_database(key); + let entry = EntryOperation::Delete(KeyEntry::from_key(key)); + match self.sender.send(WriterOperation { database, entry }) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } +} + +impl FacetDocidsSender<'_> { + fn extract_database<'a>(&self, key: &'a [u8]) -> (Database, &'a [u8]) { + let database = match FacetKind::from(key[0]) { + FacetKind::Number => Database::FacetIdF64NumberDocids, + FacetKind::String => Database::FacetIdStringDocids, + FacetKind::Null => Database::FacetIdIsNullDocids, + FacetKind::Empty => Database::FacetIdIsEmptyDocids, + FacetKind::Exists => Database::FacetIdExistsDocids, + }; + (database, &key[1..]) + } +} + pub struct DocumentsSender<'a>(&'a Sender); impl DocumentsSender<'_> { @@ -321,6 +405,7 @@ pub enum MergerOperation { WordFidDocidsMerger(Merger), WordPairProximityDocidsMerger(Merger), WordPositionDocidsMerger(Merger), + FacetDocidsMerger(Merger), DeleteDocument { docid: DocumentId }, InsertDocument { docid: DocumentId, document: Box }, FinishedDocument, @@ -344,7 +429,7 @@ impl ExtractorSender { DocumentSender(&self.0) } - pub fn send_searchable( + pub fn send_searchable( &self, merger: Merger, ) -> StdResult<(), SendError<()>> { diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 9471c753b..41bce2215 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -1,61 +1,180 @@ use std::collections::HashSet; +use std::fmt::Debug; +use std::fs::File; +use grenad::{MergeFunction, Merger}; use heed::RoTxn; +use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use serde_json::Value; -use super::FacetedExtractor; +use super::super::cache::CboCachedSorter; +use super::facet_document::extract_document_facets; +use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; -use crate::{normalize_facet, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; +use crate::update::new::extract::DocidsExtractor; +use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; +use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH}; +pub struct FacetedDocidsExtractor; -pub struct FieldIdFacetNumberDocidsExtractor; +impl FacetedDocidsExtractor { + fn extract_document_change( + rtxn: &RoTxn, + index: &Index, + buffer: &mut Vec, + fields_ids_map: &mut GlobalFieldsIdsMap, + attributes_to_extract: &[&str], + cached_sorter: &mut CboCachedSorter, + document_change: DocumentChange, + ) -> Result<()> { + match document_change { + DocumentChange::Deletion(inner) => extract_document_facets( + attributes_to_extract, + inner.current(rtxn, index)?.unwrap(), + fields_ids_map, + &mut |fid, value| { + Self::facet_fn_with_options( + buffer, + cached_sorter, + CboCachedSorter::insert_del_u32, + inner.docid(), + fid, + value, + ) + }, + ), + DocumentChange::Update(inner) => { + extract_document_facets( + attributes_to_extract, + inner.current(rtxn, index)?.unwrap(), + fields_ids_map, + &mut |fid, value| { + Self::facet_fn_with_options( + buffer, + cached_sorter, + CboCachedSorter::insert_del_u32, + inner.docid(), + fid, + value, + ) + }, + )?; + + extract_document_facets( + attributes_to_extract, + inner.new(), + fields_ids_map, + &mut |fid, value| { + Self::facet_fn_with_options( + buffer, + cached_sorter, + CboCachedSorter::insert_add_u32, + inner.docid(), + fid, + value, + ) + }, + ) + } + DocumentChange::Insertion(inner) => extract_document_facets( + attributes_to_extract, + inner.new(), + fields_ids_map, + &mut |fid, value| { + Self::facet_fn_with_options( + buffer, + cached_sorter, + CboCachedSorter::insert_add_u32, + inner.docid(), + fid, + value, + ) + }, + ), + } + } + + fn facet_fn_with_options( + buffer: &mut Vec, + cached_sorter: &mut CboCachedSorter, + cache_fn: impl Fn(&mut CboCachedSorter, &[u8], u32) -> grenad::Result<(), MF::Error>, + docid: DocumentId, + fid: FieldId, + value: &Value, + ) -> Result<()> + where + MF: MergeFunction, + MF::Error: Debug, + grenad::Error: Into, + { + // Exists + // key: fid + buffer.clear(); + buffer.push(FacetKind::Exists as u8); + buffer.extend_from_slice(&fid.to_be_bytes()); + cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)?; + + match value { + // Number + // key: fid - level - orderedf64 - orignalf64 + Value::Number(number) => { + if let Some((n, ordered)) = + number.as_f64().and_then(|n| f64_into_bytes(n).map(|ordered| (n, ordered))) + { + buffer.clear(); + buffer.push(FacetKind::Number as u8); + buffer.extend_from_slice(&fid.to_be_bytes()); + buffer.push(1); // level 0 + buffer.extend_from_slice(&ordered); + buffer.extend_from_slice(&n.to_be_bytes()); + + cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + } else { + Ok(()) + } + } + // String + // key: fid - level - truncated_string + Value::String(s) => { + let truncated = truncate_str(s); + buffer.clear(); + buffer.push(FacetKind::String as u8); + buffer.extend_from_slice(&fid.to_be_bytes()); + buffer.push(1); // level 0 + buffer.extend_from_slice(truncated.as_bytes()); + cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + } + // Null + // key: fid + Value::Null => { + buffer.clear(); + buffer.push(FacetKind::Null as u8); + buffer.extend_from_slice(&fid.to_be_bytes()); + cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + } + // Empty + // key: fid + Value::Array(a) if a.is_empty() => { + buffer.clear(); + buffer.push(FacetKind::Empty as u8); + buffer.extend_from_slice(&fid.to_be_bytes()); + cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + } + Value::Object(o) if o.is_empty() => { + buffer.clear(); + buffer.push(FacetKind::Empty as u8); + buffer.extend_from_slice(&fid.to_be_bytes()); + cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into) + } + // Otherwise, do nothing + /// TODO: What about Value::Bool? + _ => Ok(()), + } + } -impl FacetedExtractor for FieldIdFacetNumberDocidsExtractor { fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result> { index.user_defined_faceted_fields(rtxn) } - - fn build_key<'b>( - field_id: FieldId, - value: &Value, - output: &'b mut Vec, - ) -> Option<&'b [u8]> { - let number = value.as_number()?; - let n = number.as_f64()?; - let ordered = f64_into_bytes(n)?; - - // fid - level - orderedf64 - orignalf64 - output.extend_from_slice(&field_id.to_be_bytes()); - output.push(1); // level 0 - output.extend_from_slice(&ordered); - output.extend_from_slice(&n.to_be_bytes()); - - Some(&*output) - } -} - -pub struct FieldIdFacetStringDocidsExtractor; - -impl FacetedExtractor for FieldIdFacetStringDocidsExtractor { - fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result> { - index.user_defined_faceted_fields(rtxn) - } - - fn build_key<'b>( - field_id: FieldId, - value: &Value, - output: &'b mut Vec, - ) -> Option<&'b [u8]> { - let string = value.as_str()?; - let normalize = normalize_facet(string); - let truncated = truncate_str(&normalize); - - // fid - level - normalized string - output.extend_from_slice(&field_id.to_be_bytes()); - output.push(1); // level 0 - output.extend_from_slice(truncated.as_bytes()); - - Some(&*output) - } } /// Truncates a string to the biggest valid LMDB key size. @@ -70,68 +189,77 @@ fn truncate_str(s: &str) -> &str { &s[..index.unwrap_or(0)] } -pub struct FieldIdFacetIsNullDocidsExtractor; +impl DocidsExtractor for FacetedDocidsExtractor { + #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] + fn run_extraction( + index: &Index, + fields_ids_map: &GlobalFieldsIdsMap, + indexer: GrenadParameters, + document_changes: impl IntoParallelIterator>, + ) -> Result> { + let max_memory = indexer.max_memory_by_thread(); -impl FacetedExtractor for FieldIdFacetIsNullDocidsExtractor { - fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result> { - index.user_defined_faceted_fields(rtxn) - } + let rtxn = index.read_txn()?; + let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; + let attributes_to_extract: Vec<_> = + attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - fn build_key<'b>( - field_id: FieldId, - value: &Value, - output: &'b mut Vec, - ) -> Option<&'b [u8]> { - if value.is_null() { - output.extend_from_slice(&field_id.to_be_bytes()); - Some(&*output) - } else { - None - } - } -} - -pub struct FieldIdFacetExistsDocidsExtractor; - -impl FacetedExtractor for FieldIdFacetExistsDocidsExtractor { - fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result> { - index.user_defined_faceted_fields(rtxn) - } - - fn build_key<'b>( - field_id: FieldId, - _value: &Value, - output: &'b mut Vec, - ) -> Option<&'b [u8]> { - output.extend_from_slice(&field_id.to_be_bytes()); - Some(&*output) - } -} - -pub struct FieldIdFacetIsEmptyDocidsExtractor; - -impl FacetedExtractor for FieldIdFacetIsEmptyDocidsExtractor { - fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result> { - index.user_defined_faceted_fields(rtxn) - } - - fn build_key<'b>( - field_id: FieldId, - value: &Value, - output: &'b mut Vec, - ) -> Option<&'b [u8]> { - let is_empty = match value { - Value::Null | Value::Bool(_) | Value::Number(_) => false, - Value::String(s) => s.is_empty(), - Value::Array(a) => a.is_empty(), - Value::Object(o) => o.is_empty(), - }; - - if is_empty { - output.extend_from_slice(&field_id.to_be_bytes()); - Some(&*output) - } else { - None + let context_pool = ItemsPool::new(|| { + Ok(( + index.read_txn()?, + fields_ids_map.clone(), + Vec::new(), + CboCachedSorter::new( + // TODO use a better value + 100.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + ), + )) + }); + + { + let span = + tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); + let _entered = span.enter(); + document_changes.into_par_iter().try_for_each(|document_change| { + context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + buffer, + fields_ids_map, + &attributes_to_extract, + cached_sorter, + document_change?, + ) + }) + })?; + } + { + let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); + let span = + tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); + let _entered = span.enter(); + + let readers: Vec<_> = context_pool + .into_items() + .par_bridge() + .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { + let sorter = cached_sorter.into_sorter()?; + sorter.into_reader_cursors() + }) + .collect(); + for reader in readers { + builder.extend(reader?); + } + Ok(builder.build()) } } } diff --git a/milli/src/update/new/extract/faceted/mod.rs b/milli/src/update/new/extract/faceted/mod.rs index b4d6b4131..a59c64d9a 100644 --- a/milli/src/update/new/extract/faceted/mod.rs +++ b/milli/src/update/new/extract/faceted/mod.rs @@ -1,180 +1,26 @@ -use std::collections::HashSet; -use std::fmt::Debug; -use std::fs::File; - -pub use extract_facets::*; -use grenad::{MergeFunction, Merger}; -use heed::RoTxn; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use serde_json::Value; - -use super::cache::CboCachedSorter; -use crate::update::new::{DocumentChange, ItemsPool}; -use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result}; - mod extract_facets; mod facet_document; -pub trait FacetedExtractor { - #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] - fn run_extraction( - index: &Index, - fields_ids_map: &GlobalFieldsIdsMap, - indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, - ) -> Result> { - let max_memory = indexer.max_memory_by_thread(); +pub use extract_facets::FacetedDocidsExtractor; - let rtxn = index.read_txn()?; - let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; - let attributes_to_extract: Vec<_> = - attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - - let context_pool = ItemsPool::new(|| { - Ok(( - index.read_txn()?, - fields_ids_map.clone(), - Vec::new(), - CboCachedSorter::new( - // TODO use a better value - 100.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - ), - )) - }); - - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { - Self::extract_document_change( - &*rtxn, - index, - buffer, - fields_ids_map, - &attributes_to_extract, - cached_sorter, - document_change?, - ) - }) - })?; - - let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); - for (_rtxn, _fields_ids_map, _buffer, cache) in context_pool.into_items() { - let sorter = cache.into_sorter()?; - let readers = sorter.into_reader_cursors()?; - builder.extend(readers); - } - - Ok(builder.build()) - } - - // TODO Shorten this - fn facet_fn_with_options( - buffer: &mut Vec, - cached_sorter: &mut CboCachedSorter, - cache_fn: impl Fn(&mut CboCachedSorter, &[u8], u32) -> grenad::Result<(), MF::Error>, - docid: DocumentId, - fid: FieldId, - value: &Value, - ) -> Result<()> - where - MF: MergeFunction, - MF::Error: Debug, - grenad::Error: Into, - { - buffer.clear(); - match Self::build_key(fid, value, buffer) { - Some(key) => cache_fn(cached_sorter, &key, docid).map_err(Into::into), - None => Ok(()), - } - } - - fn extract_document_change( - rtxn: &RoTxn, - index: &Index, - buffer: &mut Vec, - fields_ids_map: &mut GlobalFieldsIdsMap, - attributes_to_extract: &[&str], - cached_sorter: &mut CboCachedSorter, - document_change: DocumentChange, - ) -> Result<()> { - match document_change { - DocumentChange::Deletion(inner) => facet_document::extract_document_facets( - attributes_to_extract, - inner.current(rtxn, index)?.unwrap(), - fields_ids_map, - &mut |fid, value| { - Self::facet_fn_with_options( - buffer, - cached_sorter, - CboCachedSorter::insert_del_u32, - inner.docid(), - fid, - value, - ) - }, - ), - DocumentChange::Update(inner) => { - facet_document::extract_document_facets( - attributes_to_extract, - inner.current(rtxn, index)?.unwrap(), - fields_ids_map, - &mut |fid, value| { - Self::facet_fn_with_options( - buffer, - cached_sorter, - CboCachedSorter::insert_del_u32, - inner.docid(), - fid, - value, - ) - }, - )?; - - facet_document::extract_document_facets( - attributes_to_extract, - inner.new(), - fields_ids_map, - &mut |fid, value| { - Self::facet_fn_with_options( - buffer, - cached_sorter, - CboCachedSorter::insert_add_u32, - inner.docid(), - fid, - value, - ) - }, - ) - } - DocumentChange::Insertion(inner) => facet_document::extract_document_facets( - attributes_to_extract, - inner.new(), - fields_ids_map, - &mut |fid, value| { - Self::facet_fn_with_options( - buffer, - cached_sorter, - CboCachedSorter::insert_add_u32, - inner.docid(), - fid, - value, - ) - }, - ), - } - } - - // TODO avoid owning the strings here. - fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result>; - - fn build_key<'b>(field_id: FieldId, value: &Value, output: &'b mut Vec) - -> Option<&'b [u8]>; +#[repr(u8)] +pub enum FacetKind { + Number = 0, + String = 1, + Null = 2, + Empty = 3, + Exists, +} + +impl From for FacetKind { + fn from(value: u8) -> Self { + match value { + 0 => Self::Number, + 1 => Self::String, + 2 => Self::Null, + 3 => Self::Empty, + 4 => Self::Exists, + _ => unreachable!(), + } + } } diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index d6d5a3005..3836f9957 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -2,9 +2,29 @@ mod cache; mod faceted; mod searchable; +use std::fs::File; + pub use faceted::*; +use grenad::Merger; +use rayon::iter::IntoParallelIterator; pub use searchable::*; +use crate::{ + update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}, + GlobalFieldsIdsMap, Index, Result, +}; + +use super::DocumentChange; + +pub trait DocidsExtractor { + fn run_extraction( + index: &Index, + fields_ids_map: &GlobalFieldsIdsMap, + indexer: GrenadParameters, + document_changes: impl IntoParallelIterator>, + ) -> Result>; +} + /// TODO move in permissive json pointer pub mod perm_json_p { use serde_json::{Map, Value}; @@ -39,6 +59,10 @@ pub mod perm_json_p { base_key: &str, seeker: &mut impl FnMut(&str, &Value) -> Result<()>, ) -> Result<()> { + if value.is_empty() { + seeker(&base_key, &Value::Object(Map::with_capacity(0)))?; + } + for (key, value) in value.iter() { let base_key = if base_key.is_empty() { key.to_string() @@ -80,6 +104,10 @@ pub mod perm_json_p { base_key: &str, seeker: &mut impl FnMut(&str, &Value) -> Result<()>, ) -> Result<()> { + if values.is_empty() { + seeker(&base_key, &Value::Array(vec![]))?; + } + for value in values { match value { Value::Object(object) => { diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 7e096591e..48d373598 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -17,6 +17,7 @@ use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; +use super::DocidsExtractor; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; @@ -130,3 +131,14 @@ pub trait SearchableExtractor { fn attributes_to_skip<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result>; } + +impl DocidsExtractor for T { + fn run_extraction( + index: &Index, + fields_ids_map: &GlobalFieldsIdsMap, + indexer: GrenadParameters, + document_changes: impl IntoParallelIterator>, + ) -> Result> { + Self::run_extraction(index, fields_ids_map, indexer, document_changes) + } +} diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 7350d9499..b40ddbc4d 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -101,6 +101,22 @@ where max_memory: Some(max_memory), ..GrenadParameters::default() }; + + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); + let _entered = span.enter(); + extract_and_send_docids::< + FacetedDocidsExtractor, + FacetDocids, + >( + index, + &global_fields_ids_map, + grenad_parameters, + document_changes.clone(), + &extractor_sender, + )?; + } + { let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); let _entered = span.enter(); @@ -176,19 +192,19 @@ where } // TODO THIS IS TOO MUCH - // Extract fieldid docid facet number - // Extract fieldid docid facet string - // Extract facetid string fst - // Extract facetid normalized string strings + // - [ ] Extract fieldid docid facet number + // - [ ] Extract fieldid docid facet string + // - [ ] Extract facetid string fst + // - [ ] Extract facetid normalized string strings // TODO Inverted Indexes again - // Extract fieldid facet isempty docids - // Extract fieldid facet isnull docids - // Extract fieldid facet exists docids + // - [x] Extract fieldid facet isempty docids + // - [x] Extract fieldid facet isnull docids + // - [x] Extract fieldid facet exists docids // TODO This is the normal system - // Extract fieldid facet number docids - // Extract fieldid facet string docids + // - [x] Extract fieldid facet number docids + // - [x] Extract fieldid facet string docids Ok(()) as Result<_> }) @@ -238,7 +254,7 @@ where /// TODO: GrenadParameters::default() should be removed in favor a passed parameter /// TODO: manage the errors correctly /// TODO: we must have a single trait that also gives the extractor type -fn extract_and_send_docids( +fn extract_and_send_docids( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 291f79216..9ba81fb11 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -12,6 +12,7 @@ use tempfile::tempfile; use super::channel::*; use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update}; +use super::extract::FacetKind; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::MergeDeladdCboRoaringBitmaps; @@ -63,26 +64,33 @@ pub fn merge_grenad_entries( )?; } MergerOperation::WordDocidsMerger(merger) => { - let span = - tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); - let _entered = span.enter(); let mut add_words_fst = SetBuilder::new(BufWriter::new(tempfile()?))?; let mut del_words_fst = SetBuilder::new(BufWriter::new(tempfile()?))?; + { + let span = + tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); + let _entered = span.enter(); - merge_and_send_docids( - merger, - index.word_docids.remap_types(), - rtxn, - &mut buffer, - sender.docids::(), - |key| add_words_fst.insert(key), - |key| del_words_fst.insert(key), - )?; + merge_and_send_docids( + merger, + index.word_docids.remap_types(), + rtxn, + &mut buffer, + sender.docids::(), + |key| add_words_fst.insert(key), + |key| del_words_fst.insert(key), + )?; + } - // Move that into a dedicated function - let words_fst = index.words_fst(rtxn)?; - let mmap = compute_new_words_fst(add_words_fst, del_words_fst, words_fst)?; - sender.main().write_words_fst(mmap).unwrap(); + { + let span = + tracing::trace_span!(target: "indexing::documents::merge", "words_fst"); + let _entered = span.enter(); + // Move that into a dedicated function + let words_fst = index.words_fst(rtxn)?; + let mmap = compute_new_words_fst(add_words_fst, del_words_fst, words_fst)?; + sender.main().write_words_fst(mmap).unwrap(); + } } MergerOperation::WordFidDocidsMerger(merger) => { let span = @@ -161,6 +169,18 @@ pub fn merge_grenad_entries( MergerOperation::FinishedDocument => { // send the rtree } + MergerOperation::FacetDocidsMerger(merger) => { + let span = + tracing::trace_span!(target: "indexing::documents::merge", "facet_docids"); + let _entered = span.enter(); + merge_and_send_facet_docids( + merger, + FacetDatabases::new(index), + rtxn, + &mut buffer, + sender.facet_docids(), + )?; + } } } @@ -252,12 +272,12 @@ fn compute_new_words_fst( } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -fn merge_and_send_docids( +fn merge_and_send_docids( merger: Merger, database: Database, rtxn: &RoTxn<'_>, buffer: &mut Vec, - word_docids_sender: DocidsSender<'_, D>, + docids_sender: impl DocidsSender, mut add_key: impl FnMut(&[u8]) -> fst::Result<()>, mut del_key: impl FnMut(&[u8]) -> fst::Result<()>, ) -> Result<()> { @@ -271,11 +291,11 @@ fn merge_and_send_docids( match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); - word_docids_sender.write(key, value).unwrap(); + docids_sender.write(key, value).unwrap(); add_key(key)?; } Operation::Delete => { - word_docids_sender.delete(key).unwrap(); + docids_sender.delete(key).unwrap(); del_key(key)?; } Operation::Ignore => (), @@ -285,6 +305,76 @@ fn merge_and_send_docids( Ok(()) } +#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] +fn merge_and_send_facet_docids( + merger: Merger, + database: FacetDatabases, + rtxn: &RoTxn<'_>, + buffer: &mut Vec, + docids_sender: impl DocidsSender, +) -> Result<()> { + let mut merger_iter = merger.into_stream_merger_iter().unwrap(); + while let Some((key, deladd)) = merger_iter.next().unwrap() { + let current = database.get(rtxn, key)?; + let deladd: &KvReaderDelAdd = deladd.into(); + let del = deladd.get(DelAdd::Deletion); + let add = deladd.get(DelAdd::Addition); + + match merge_cbo_bitmaps(current, del, add)? { + Operation::Write(bitmap) => { + let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); + docids_sender.write(key, value).unwrap(); + } + Operation::Delete => { + docids_sender.delete(key).unwrap(); + } + Operation::Ignore => (), + } + } + + Ok(()) +} + +struct FacetDatabases { + /// Maps the facet field id and the docids for which this field exists + facet_id_exists_docids: Database, + /// Maps the facet field id and the docids for which this field is set as null + facet_id_is_null_docids: Database, + /// Maps the facet field id and the docids for which this field is considered empty + facet_id_is_empty_docids: Database, + /// Maps the facet field id and ranges of numbers with the docids that corresponds to them. + facet_id_f64_docids: Database, + /// Maps the facet field id and ranges of strings with the docids that corresponds to them. + facet_id_string_docids: Database, +} + +impl FacetDatabases { + fn new(index: &Index) -> Self { + Self { + facet_id_exists_docids: index.facet_id_exists_docids.remap_types(), + facet_id_is_null_docids: index.facet_id_is_null_docids.remap_types(), + facet_id_is_empty_docids: index.facet_id_is_empty_docids.remap_types(), + facet_id_f64_docids: index.facet_id_f64_docids.remap_types(), + facet_id_string_docids: index.facet_id_string_docids.remap_types(), + } + } + + fn get<'a>(&self, rtxn: &'a RoTxn<'_>, key: &[u8]) -> heed::Result> { + let (facet_kind, key) = self.extract_facet_kind(key); + match facet_kind { + FacetKind::Exists => self.facet_id_exists_docids.get(rtxn, key), + FacetKind::Null => self.facet_id_is_null_docids.get(rtxn, key), + FacetKind::Empty => self.facet_id_is_empty_docids.get(rtxn, key), + FacetKind::Number => self.facet_id_f64_docids.get(rtxn, key), + FacetKind::String => self.facet_id_string_docids.get(rtxn, key), + } + } + + fn extract_facet_kind<'a>(&self, key: &'a [u8]) -> (FacetKind, &'a [u8]) { + (FacetKind::from(key[0]), &key[1..]) + } +} + enum Operation { Write(RoaringBitmap), Delete,