Extract and write facet databases

This commit is contained in:
ManyTheFish 2024-09-16 09:34:10 +02:00
parent f7652186e1
commit 7ba49b849e
7 changed files with 526 additions and 321 deletions

View File

@ -6,6 +6,7 @@ use grenad::Merger;
use heed::types::Bytes; use heed::types::Bytes;
use memmap2::Mmap; use memmap2::Mmap;
use super::extract::FacetKind;
use super::StdResult; use super::StdResult;
use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY}; use crate::index::main_key::{DOCUMENTS_IDS_KEY, WORDS_FST_KEY};
use crate::update::new::KvReaderFieldId; use crate::update::new::KvReaderFieldId;
@ -120,11 +121,16 @@ pub enum Database {
WordFidDocids, WordFidDocids,
WordPairProximityDocids, WordPairProximityDocids,
WordPositionDocids, WordPositionDocids,
FacetIdIsNullDocids,
FacetIdIsEmptyDocids,
FacetIdExistsDocids,
FacetIdF64NumberDocids,
FacetIdStringDocids,
} }
impl WriterOperation { impl Database {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> { pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
match self.database { match self {
Database::Documents => index.documents.remap_types(), Database::Documents => index.documents.remap_types(),
Database::ExactWordDocids => index.exact_word_docids.remap_types(), Database::ExactWordDocids => index.exact_word_docids.remap_types(),
Database::Main => index.main.remap_types(), Database::Main => index.main.remap_types(),
@ -133,8 +139,19 @@ impl WriterOperation {
Database::WordPositionDocids => index.word_position_docids.remap_types(), Database::WordPositionDocids => index.word_position_docids.remap_types(),
Database::FidWordCountDocids => index.field_id_word_count_docids.remap_types(), Database::FidWordCountDocids => index.field_id_word_count_docids.remap_types(),
Database::WordPairProximityDocids => index.word_pair_proximity_docids.remap_types(), Database::WordPairProximityDocids => index.word_pair_proximity_docids.remap_types(),
Database::FacetIdIsNullDocids => index.facet_id_is_null_docids.remap_types(),
Database::FacetIdIsEmptyDocids => index.facet_id_is_empty_docids.remap_types(),
Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(),
Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(),
Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(),
} }
} }
}
impl WriterOperation {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
self.database.database(index)
}
pub fn entry(self) -> EntryOperation { pub fn entry(self) -> EntryOperation {
self.entry self.entry
@ -159,8 +176,12 @@ impl MergerSender {
MainSender(&self.0) MainSender(&self.0)
} }
pub fn docids<D: DatabaseType>(&self) -> DocidsSender<'_, D> { pub fn docids<D: DatabaseType>(&self) -> WordDocidsSender<'_, D> {
DocidsSender { sender: &self.0, _marker: PhantomData } WordDocidsSender { sender: &self.0, _marker: PhantomData }
}
pub fn facet_docids(&self) -> FacetDocidsSender<'_> {
FacetDocidsSender { sender: &self.0 }
} }
pub fn documents(&self) -> DocumentsSender<'_> { pub fn documents(&self) -> DocumentsSender<'_> {
@ -208,16 +229,21 @@ pub enum WordDocids {}
pub enum WordFidDocids {} pub enum WordFidDocids {}
pub enum WordPairProximityDocids {} pub enum WordPairProximityDocids {}
pub enum WordPositionDocids {} pub enum WordPositionDocids {}
pub enum FacetDocids {}
pub trait DatabaseType { pub trait DatabaseType {
const DATABASE: Database; const DATABASE: Database;
}
pub trait MergerOperationType {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation; fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation;
} }
impl DatabaseType for ExactWordDocids { impl DatabaseType for ExactWordDocids {
const DATABASE: Database = Database::ExactWordDocids; const DATABASE: Database = Database::ExactWordDocids;
}
impl MergerOperationType for ExactWordDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::ExactWordDocidsMerger(merger) MergerOperation::ExactWordDocidsMerger(merger)
} }
@ -225,7 +251,9 @@ impl DatabaseType for ExactWordDocids {
impl DatabaseType for FidWordCountDocids { impl DatabaseType for FidWordCountDocids {
const DATABASE: Database = Database::FidWordCountDocids; const DATABASE: Database = Database::FidWordCountDocids;
}
impl MergerOperationType for FidWordCountDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::FidWordCountDocidsMerger(merger) MergerOperation::FidWordCountDocidsMerger(merger)
} }
@ -233,7 +261,9 @@ impl DatabaseType for FidWordCountDocids {
impl DatabaseType for WordDocids { impl DatabaseType for WordDocids {
const DATABASE: Database = Database::WordDocids; const DATABASE: Database = Database::WordDocids;
}
impl MergerOperationType for WordDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordDocidsMerger(merger) MergerOperation::WordDocidsMerger(merger)
} }
@ -241,7 +271,9 @@ impl DatabaseType for WordDocids {
impl DatabaseType for WordFidDocids { impl DatabaseType for WordFidDocids {
const DATABASE: Database = Database::WordFidDocids; const DATABASE: Database = Database::WordFidDocids;
}
impl MergerOperationType for WordFidDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordFidDocidsMerger(merger) MergerOperation::WordFidDocidsMerger(merger)
} }
@ -249,7 +281,9 @@ impl DatabaseType for WordFidDocids {
impl DatabaseType for WordPairProximityDocids { impl DatabaseType for WordPairProximityDocids {
const DATABASE: Database = Database::WordPairProximityDocids; const DATABASE: Database = Database::WordPairProximityDocids;
}
impl MergerOperationType for WordPairProximityDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordPairProximityDocidsMerger(merger) MergerOperation::WordPairProximityDocidsMerger(merger)
} }
@ -257,19 +291,32 @@ impl DatabaseType for WordPairProximityDocids {
impl DatabaseType for WordPositionDocids { impl DatabaseType for WordPositionDocids {
const DATABASE: Database = Database::WordPositionDocids; const DATABASE: Database = Database::WordPositionDocids;
}
impl MergerOperationType for WordPositionDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation { fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::WordPositionDocidsMerger(merger) MergerOperation::WordPositionDocidsMerger(merger)
} }
} }
pub struct DocidsSender<'a, D> { impl MergerOperationType for FacetDocids {
fn new_merger_operation(merger: Merger<File, MergeDeladdCboRoaringBitmaps>) -> MergerOperation {
MergerOperation::FacetDocidsMerger(merger)
}
}
pub trait DocidsSender {
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>>;
fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>>;
}
pub struct WordDocidsSender<'a, D> {
sender: &'a Sender<WriterOperation>, sender: &'a Sender<WriterOperation>,
_marker: PhantomData<D>, _marker: PhantomData<D>,
} }
impl<D: DatabaseType> DocidsSender<'_, D> { impl<D: DatabaseType> DocidsSender for WordDocidsSender<'_, D> {
pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
match self.sender.send(WriterOperation { database: D::DATABASE, entry }) { match self.sender.send(WriterOperation { database: D::DATABASE, entry }) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
@ -277,7 +324,7 @@ impl<D: DatabaseType> DocidsSender<'_, D> {
} }
} }
pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Delete(KeyEntry::from_key(key)); let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self.sender.send(WriterOperation { database: D::DATABASE, entry }) { match self.sender.send(WriterOperation { database: D::DATABASE, entry }) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
@ -286,6 +333,43 @@ impl<D: DatabaseType> DocidsSender<'_, D> {
} }
} }
pub struct FacetDocidsSender<'a> {
sender: &'a Sender<WriterOperation>,
}
impl DocidsSender for FacetDocidsSender<'_> {
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
let (database, key) = self.extract_database(key);
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
match self.sender.send(WriterOperation { database, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let (database, key) = self.extract_database(key);
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self.sender.send(WriterOperation { database, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
impl FacetDocidsSender<'_> {
fn extract_database<'a>(&self, key: &'a [u8]) -> (Database, &'a [u8]) {
let database = match FacetKind::from(key[0]) {
FacetKind::Number => Database::FacetIdF64NumberDocids,
FacetKind::String => Database::FacetIdStringDocids,
FacetKind::Null => Database::FacetIdIsNullDocids,
FacetKind::Empty => Database::FacetIdIsEmptyDocids,
FacetKind::Exists => Database::FacetIdExistsDocids,
};
(database, &key[1..])
}
}
pub struct DocumentsSender<'a>(&'a Sender<WriterOperation>); pub struct DocumentsSender<'a>(&'a Sender<WriterOperation>);
impl DocumentsSender<'_> { impl DocumentsSender<'_> {
@ -321,6 +405,7 @@ pub enum MergerOperation {
WordFidDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), WordFidDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordPairProximityDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), WordPairProximityDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
WordPositionDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), WordPositionDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
FacetDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
DeleteDocument { docid: DocumentId }, DeleteDocument { docid: DocumentId },
InsertDocument { docid: DocumentId, document: Box<KvReaderFieldId> }, InsertDocument { docid: DocumentId, document: Box<KvReaderFieldId> },
FinishedDocument, FinishedDocument,
@ -344,7 +429,7 @@ impl ExtractorSender {
DocumentSender(&self.0) DocumentSender(&self.0)
} }
pub fn send_searchable<D: DatabaseType>( pub fn send_searchable<D: MergerOperationType>(
&self, &self,
merger: Merger<File, MergeDeladdCboRoaringBitmaps>, merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
) -> StdResult<(), SendError<()>> { ) -> StdResult<(), SendError<()>> {

View File

@ -1,61 +1,180 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug;
use std::fs::File;
use grenad::{MergeFunction, Merger};
use heed::RoTxn; use heed::RoTxn;
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use serde_json::Value; use serde_json::Value;
use super::FacetedExtractor; use super::super::cache::CboCachedSorter;
use super::facet_document::extract_document_facets;
use super::FacetKind;
use crate::facet::value_encoding::f64_into_bytes; use crate::facet::value_encoding::f64_into_bytes;
use crate::{normalize_facet, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; use crate::update::new::extract::DocidsExtractor;
use crate::update::new::{DocumentChange, ItemsPool};
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedDocidsExtractor;
pub struct FieldIdFacetNumberDocidsExtractor; impl FacetedDocidsExtractor {
fn extract_document_change(
rtxn: &RoTxn,
index: &Index,
buffer: &mut Vec<u8>,
fields_ids_map: &mut GlobalFieldsIdsMap,
attributes_to_extract: &[&str],
cached_sorter: &mut CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
document_change: DocumentChange,
) -> Result<()> {
match document_change {
DocumentChange::Deletion(inner) => extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index)?.unwrap(),
fields_ids_map,
&mut |fid, value| {
Self::facet_fn_with_options(
buffer,
cached_sorter,
CboCachedSorter::insert_del_u32,
inner.docid(),
fid,
value,
)
},
),
DocumentChange::Update(inner) => {
extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index)?.unwrap(),
fields_ids_map,
&mut |fid, value| {
Self::facet_fn_with_options(
buffer,
cached_sorter,
CboCachedSorter::insert_del_u32,
inner.docid(),
fid,
value,
)
},
)?;
extract_document_facets(
attributes_to_extract,
inner.new(),
fields_ids_map,
&mut |fid, value| {
Self::facet_fn_with_options(
buffer,
cached_sorter,
CboCachedSorter::insert_add_u32,
inner.docid(),
fid,
value,
)
},
)
}
DocumentChange::Insertion(inner) => extract_document_facets(
attributes_to_extract,
inner.new(),
fields_ids_map,
&mut |fid, value| {
Self::facet_fn_with_options(
buffer,
cached_sorter,
CboCachedSorter::insert_add_u32,
inner.docid(),
fid,
value,
)
},
),
}
}
fn facet_fn_with_options<MF>(
buffer: &mut Vec<u8>,
cached_sorter: &mut CboCachedSorter<MF>,
cache_fn: impl Fn(&mut CboCachedSorter<MF>, &[u8], u32) -> grenad::Result<(), MF::Error>,
docid: DocumentId,
fid: FieldId,
value: &Value,
) -> Result<()>
where
MF: MergeFunction,
MF::Error: Debug,
grenad::Error<MF::Error>: Into<crate::Error>,
{
// Exists
// key: fid
buffer.clear();
buffer.push(FacetKind::Exists as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)?;
match value {
// Number
// key: fid - level - orderedf64 - orignalf64
Value::Number(number) => {
if let Some((n, ordered)) =
number.as_f64().and_then(|n| f64_into_bytes(n).map(|ordered| (n, ordered)))
{
buffer.clear();
buffer.push(FacetKind::Number as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(1); // level 0
buffer.extend_from_slice(&ordered);
buffer.extend_from_slice(&n.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)
} else {
Ok(())
}
}
// String
// key: fid - level - truncated_string
Value::String(s) => {
let truncated = truncate_str(s);
buffer.clear();
buffer.push(FacetKind::String as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(1); // level 0
buffer.extend_from_slice(truncated.as_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)
}
// Null
// key: fid
Value::Null => {
buffer.clear();
buffer.push(FacetKind::Null as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)
}
// Empty
// key: fid
Value::Array(a) if a.is_empty() => {
buffer.clear();
buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)
}
Value::Object(o) if o.is_empty() => {
buffer.clear();
buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)
}
// Otherwise, do nothing
/// TODO: What about Value::Bool?
_ => Ok(()),
}
}
impl FacetedExtractor for FieldIdFacetNumberDocidsExtractor {
fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<HashSet<String>> { fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<HashSet<String>> {
index.user_defined_faceted_fields(rtxn) index.user_defined_faceted_fields(rtxn)
} }
fn build_key<'b>(
field_id: FieldId,
value: &Value,
output: &'b mut Vec<u8>,
) -> Option<&'b [u8]> {
let number = value.as_number()?;
let n = number.as_f64()?;
let ordered = f64_into_bytes(n)?;
// fid - level - orderedf64 - orignalf64
output.extend_from_slice(&field_id.to_be_bytes());
output.push(1); // level 0
output.extend_from_slice(&ordered);
output.extend_from_slice(&n.to_be_bytes());
Some(&*output)
}
}
pub struct FieldIdFacetStringDocidsExtractor;
impl FacetedExtractor for FieldIdFacetStringDocidsExtractor {
fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<HashSet<String>> {
index.user_defined_faceted_fields(rtxn)
}
fn build_key<'b>(
field_id: FieldId,
value: &Value,
output: &'b mut Vec<u8>,
) -> Option<&'b [u8]> {
let string = value.as_str()?;
let normalize = normalize_facet(string);
let truncated = truncate_str(&normalize);
// fid - level - normalized string
output.extend_from_slice(&field_id.to_be_bytes());
output.push(1); // level 0
output.extend_from_slice(truncated.as_bytes());
Some(&*output)
}
} }
/// Truncates a string to the biggest valid LMDB key size. /// Truncates a string to the biggest valid LMDB key size.
@ -70,68 +189,77 @@ fn truncate_str(s: &str) -> &str {
&s[..index.unwrap_or(0)] &s[..index.unwrap_or(0)]
} }
pub struct FieldIdFacetIsNullDocidsExtractor; impl DocidsExtractor for FacetedDocidsExtractor {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
fn run_extraction(
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
let max_memory = indexer.max_memory_by_thread();
impl FacetedExtractor for FieldIdFacetIsNullDocidsExtractor { let rtxn = index.read_txn()?;
fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<HashSet<String>> { let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?;
index.user_defined_faceted_fields(rtxn) let attributes_to_extract: Vec<_> =
attributes_to_extract.iter().map(|s| s.as_ref()).collect();
let context_pool = ItemsPool::new(|| {
Ok((
index.read_txn()?,
fields_ids_map.clone(),
Vec::new(),
CboCachedSorter::new(
// TODO use a better value
100.try_into().unwrap(),
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
),
))
});
{
let span =
tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction");
let _entered = span.enter();
document_changes.into_par_iter().try_for_each(|document_change| {
context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| {
Self::extract_document_change(
&*rtxn,
index,
buffer,
fields_ids_map,
&attributes_to_extract,
cached_sorter,
document_change?,
)
})
})?;
} }
{
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
let span =
tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter();
fn build_key<'b>( let readers: Vec<_> = context_pool
field_id: FieldId, .into_items()
value: &Value, .par_bridge()
output: &'b mut Vec<u8>, .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| {
) -> Option<&'b [u8]> { let sorter = cached_sorter.into_sorter()?;
if value.is_null() { sorter.into_reader_cursors()
output.extend_from_slice(&field_id.to_be_bytes()); })
Some(&*output) .collect();
} else { for reader in readers {
None builder.extend(reader?);
} }
} Ok(builder.build())
}
pub struct FieldIdFacetExistsDocidsExtractor;
impl FacetedExtractor for FieldIdFacetExistsDocidsExtractor {
fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<HashSet<String>> {
index.user_defined_faceted_fields(rtxn)
}
fn build_key<'b>(
field_id: FieldId,
_value: &Value,
output: &'b mut Vec<u8>,
) -> Option<&'b [u8]> {
output.extend_from_slice(&field_id.to_be_bytes());
Some(&*output)
}
}
pub struct FieldIdFacetIsEmptyDocidsExtractor;
impl FacetedExtractor for FieldIdFacetIsEmptyDocidsExtractor {
fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<HashSet<String>> {
index.user_defined_faceted_fields(rtxn)
}
fn build_key<'b>(
field_id: FieldId,
value: &Value,
output: &'b mut Vec<u8>,
) -> Option<&'b [u8]> {
let is_empty = match value {
Value::Null | Value::Bool(_) | Value::Number(_) => false,
Value::String(s) => s.is_empty(),
Value::Array(a) => a.is_empty(),
Value::Object(o) => o.is_empty(),
};
if is_empty {
output.extend_from_slice(&field_id.to_be_bytes());
Some(&*output)
} else {
None
} }
} }
} }

View File

@ -1,180 +1,26 @@
use std::collections::HashSet;
use std::fmt::Debug;
use std::fs::File;
pub use extract_facets::*;
use grenad::{MergeFunction, Merger};
use heed::RoTxn;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use serde_json::Value;
use super::cache::CboCachedSorter;
use crate::update::new::{DocumentChange, ItemsPool};
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result};
mod extract_facets; mod extract_facets;
mod facet_document; mod facet_document;
pub trait FacetedExtractor { pub use extract_facets::FacetedDocidsExtractor;
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
fn run_extraction(
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
let max_memory = indexer.max_memory_by_thread();
let rtxn = index.read_txn()?; #[repr(u8)]
let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; pub enum FacetKind {
let attributes_to_extract: Vec<_> = Number = 0,
attributes_to_extract.iter().map(|s| s.as_ref()).collect(); String = 1,
Null = 2,
let context_pool = ItemsPool::new(|| { Empty = 3,
Ok(( Exists,
index.read_txn()?, }
fields_ids_map.clone(),
Vec::new(), impl From<u8> for FacetKind {
CboCachedSorter::new( fn from(value: u8) -> Self {
// TODO use a better value match value {
100.try_into().unwrap(), 0 => Self::Number,
create_sorter( 1 => Self::String,
grenad::SortAlgorithm::Stable, 2 => Self::Null,
MergeDeladdCboRoaringBitmaps, 3 => Self::Empty,
indexer.chunk_compression_type, 4 => Self::Exists,
indexer.chunk_compression_level, _ => unreachable!(),
indexer.max_nb_chunks, }
max_memory, }
),
),
))
});
document_changes.into_par_iter().try_for_each(|document_change| {
context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| {
Self::extract_document_change(
&*rtxn,
index,
buffer,
fields_ids_map,
&attributes_to_extract,
cached_sorter,
document_change?,
)
})
})?;
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
for (_rtxn, _fields_ids_map, _buffer, cache) in context_pool.into_items() {
let sorter = cache.into_sorter()?;
let readers = sorter.into_reader_cursors()?;
builder.extend(readers);
}
Ok(builder.build())
}
// TODO Shorten this
fn facet_fn_with_options<MF>(
buffer: &mut Vec<u8>,
cached_sorter: &mut CboCachedSorter<MF>,
cache_fn: impl Fn(&mut CboCachedSorter<MF>, &[u8], u32) -> grenad::Result<(), MF::Error>,
docid: DocumentId,
fid: FieldId,
value: &Value,
) -> Result<()>
where
MF: MergeFunction,
MF::Error: Debug,
grenad::Error<MF::Error>: Into<crate::Error>,
{
buffer.clear();
match Self::build_key(fid, value, buffer) {
Some(key) => cache_fn(cached_sorter, &key, docid).map_err(Into::into),
None => Ok(()),
}
}
fn extract_document_change(
rtxn: &RoTxn,
index: &Index,
buffer: &mut Vec<u8>,
fields_ids_map: &mut GlobalFieldsIdsMap,
attributes_to_extract: &[&str],
cached_sorter: &mut CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
document_change: DocumentChange,
) -> Result<()> {
match document_change {
DocumentChange::Deletion(inner) => facet_document::extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index)?.unwrap(),
fields_ids_map,
&mut |fid, value| {
Self::facet_fn_with_options(
buffer,
cached_sorter,
CboCachedSorter::insert_del_u32,
inner.docid(),
fid,
value,
)
},
),
DocumentChange::Update(inner) => {
facet_document::extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index)?.unwrap(),
fields_ids_map,
&mut |fid, value| {
Self::facet_fn_with_options(
buffer,
cached_sorter,
CboCachedSorter::insert_del_u32,
inner.docid(),
fid,
value,
)
},
)?;
facet_document::extract_document_facets(
attributes_to_extract,
inner.new(),
fields_ids_map,
&mut |fid, value| {
Self::facet_fn_with_options(
buffer,
cached_sorter,
CboCachedSorter::insert_add_u32,
inner.docid(),
fid,
value,
)
},
)
}
DocumentChange::Insertion(inner) => facet_document::extract_document_facets(
attributes_to_extract,
inner.new(),
fields_ids_map,
&mut |fid, value| {
Self::facet_fn_with_options(
buffer,
cached_sorter,
CboCachedSorter::insert_add_u32,
inner.docid(),
fid,
value,
)
},
),
}
}
// TODO avoid owning the strings here.
fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<HashSet<String>>;
fn build_key<'b>(field_id: FieldId, value: &Value, output: &'b mut Vec<u8>)
-> Option<&'b [u8]>;
} }

View File

@ -2,9 +2,29 @@ mod cache;
mod faceted; mod faceted;
mod searchable; mod searchable;
use std::fs::File;
pub use faceted::*; pub use faceted::*;
use grenad::Merger;
use rayon::iter::IntoParallelIterator;
pub use searchable::*; pub use searchable::*;
use crate::{
update::{GrenadParameters, MergeDeladdCboRoaringBitmaps},
GlobalFieldsIdsMap, Index, Result,
};
use super::DocumentChange;
pub trait DocidsExtractor {
fn run_extraction(
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>;
}
/// TODO move in permissive json pointer /// TODO move in permissive json pointer
pub mod perm_json_p { pub mod perm_json_p {
use serde_json::{Map, Value}; use serde_json::{Map, Value};
@ -39,6 +59,10 @@ pub mod perm_json_p {
base_key: &str, base_key: &str,
seeker: &mut impl FnMut(&str, &Value) -> Result<()>, seeker: &mut impl FnMut(&str, &Value) -> Result<()>,
) -> Result<()> { ) -> Result<()> {
if value.is_empty() {
seeker(&base_key, &Value::Object(Map::with_capacity(0)))?;
}
for (key, value) in value.iter() { for (key, value) in value.iter() {
let base_key = if base_key.is_empty() { let base_key = if base_key.is_empty() {
key.to_string() key.to_string()
@ -80,6 +104,10 @@ pub mod perm_json_p {
base_key: &str, base_key: &str,
seeker: &mut impl FnMut(&str, &Value) -> Result<()>, seeker: &mut impl FnMut(&str, &Value) -> Result<()>,
) -> Result<()> { ) -> Result<()> {
if values.is_empty() {
seeker(&base_key, &Value::Array(vec![]))?;
}
for value in values { for value in values {
match value { match value {
Value::Object(object) => { Value::Object(object) => {

View File

@ -17,6 +17,7 @@ use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter; use super::cache::CboCachedSorter;
use super::DocidsExtractor;
use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::new::{DocumentChange, ItemsPool};
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
@ -130,3 +131,14 @@ pub trait SearchableExtractor {
fn attributes_to_skip<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<Vec<&'a str>>; fn attributes_to_skip<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<Vec<&'a str>>;
} }
impl<T: SearchableExtractor> DocidsExtractor for T {
fn run_extraction(
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
Self::run_extraction(index, fields_ids_map, indexer, document_changes)
}
}

View File

@ -101,6 +101,22 @@ where
max_memory: Some(max_memory), max_memory: Some(max_memory),
..GrenadParameters::default() ..GrenadParameters::default()
}; };
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter();
extract_and_send_docids::<
FacetedDocidsExtractor,
FacetDocids,
>(
index,
&global_fields_ids_map,
grenad_parameters,
document_changes.clone(),
&extractor_sender,
)?;
}
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
@ -176,19 +192,19 @@ where
} }
// TODO THIS IS TOO MUCH // TODO THIS IS TOO MUCH
// Extract fieldid docid facet number // - [ ] Extract fieldid docid facet number
// Extract fieldid docid facet string // - [ ] Extract fieldid docid facet string
// Extract facetid string fst // - [ ] Extract facetid string fst
// Extract facetid normalized string strings // - [ ] Extract facetid normalized string strings
// TODO Inverted Indexes again // TODO Inverted Indexes again
// Extract fieldid facet isempty docids // - [x] Extract fieldid facet isempty docids
// Extract fieldid facet isnull docids // - [x] Extract fieldid facet isnull docids
// Extract fieldid facet exists docids // - [x] Extract fieldid facet exists docids
// TODO This is the normal system // TODO This is the normal system
// Extract fieldid facet number docids // - [x] Extract fieldid facet number docids
// Extract fieldid facet string docids // - [x] Extract fieldid facet string docids
Ok(()) as Result<_> Ok(()) as Result<_>
}) })
@ -238,7 +254,7 @@ where
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter /// TODO: GrenadParameters::default() should be removed in favor a passed parameter
/// TODO: manage the errors correctly /// TODO: manage the errors correctly
/// TODO: we must have a single trait that also gives the extractor type /// TODO: we must have a single trait that also gives the extractor type
fn extract_and_send_docids<E: SearchableExtractor, D: DatabaseType>( fn extract_and_send_docids<E: DocidsExtractor, D: MergerOperationType>(
index: &Index, index: &Index,
fields_ids_map: &GlobalFieldsIdsMap, fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters, indexer: GrenadParameters,

View File

@ -12,6 +12,7 @@ use tempfile::tempfile;
use super::channel::*; use super::channel::*;
use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update}; use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update};
use super::extract::FacetKind;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation; use crate::update::new::channel::MergerOperation;
use crate::update::MergeDeladdCboRoaringBitmaps; use crate::update::MergeDeladdCboRoaringBitmaps;
@ -63,11 +64,12 @@ pub fn merge_grenad_entries(
)?; )?;
} }
MergerOperation::WordDocidsMerger(merger) => { MergerOperation::WordDocidsMerger(merger) => {
let mut add_words_fst = SetBuilder::new(BufWriter::new(tempfile()?))?;
let mut del_words_fst = SetBuilder::new(BufWriter::new(tempfile()?))?;
{
let span = let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut add_words_fst = SetBuilder::new(BufWriter::new(tempfile()?))?;
let mut del_words_fst = SetBuilder::new(BufWriter::new(tempfile()?))?;
merge_and_send_docids( merge_and_send_docids(
merger, merger,
@ -78,12 +80,18 @@ pub fn merge_grenad_entries(
|key| add_words_fst.insert(key), |key| add_words_fst.insert(key),
|key| del_words_fst.insert(key), |key| del_words_fst.insert(key),
)?; )?;
}
{
let span =
tracing::trace_span!(target: "indexing::documents::merge", "words_fst");
let _entered = span.enter();
// Move that into a dedicated function // Move that into a dedicated function
let words_fst = index.words_fst(rtxn)?; let words_fst = index.words_fst(rtxn)?;
let mmap = compute_new_words_fst(add_words_fst, del_words_fst, words_fst)?; let mmap = compute_new_words_fst(add_words_fst, del_words_fst, words_fst)?;
sender.main().write_words_fst(mmap).unwrap(); sender.main().write_words_fst(mmap).unwrap();
} }
}
MergerOperation::WordFidDocidsMerger(merger) => { MergerOperation::WordFidDocidsMerger(merger) => {
let span = let span =
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
@ -161,6 +169,18 @@ pub fn merge_grenad_entries(
MergerOperation::FinishedDocument => { MergerOperation::FinishedDocument => {
// send the rtree // send the rtree
} }
MergerOperation::FacetDocidsMerger(merger) => {
let span =
tracing::trace_span!(target: "indexing::documents::merge", "facet_docids");
let _entered = span.enter();
merge_and_send_facet_docids(
merger,
FacetDatabases::new(index),
rtxn,
&mut buffer,
sender.facet_docids(),
)?;
}
} }
} }
@ -252,12 +272,12 @@ fn compute_new_words_fst(
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_docids<D: DatabaseType>( fn merge_and_send_docids(
merger: Merger<File, MergeDeladdCboRoaringBitmaps>, merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
database: Database<Bytes, Bytes>, database: Database<Bytes, Bytes>,
rtxn: &RoTxn<'_>, rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
word_docids_sender: DocidsSender<'_, D>, docids_sender: impl DocidsSender,
mut add_key: impl FnMut(&[u8]) -> fst::Result<()>, mut add_key: impl FnMut(&[u8]) -> fst::Result<()>,
mut del_key: impl FnMut(&[u8]) -> fst::Result<()>, mut del_key: impl FnMut(&[u8]) -> fst::Result<()>,
) -> Result<()> { ) -> Result<()> {
@ -271,11 +291,11 @@ fn merge_and_send_docids<D: DatabaseType>(
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
word_docids_sender.write(key, value).unwrap(); docids_sender.write(key, value).unwrap();
add_key(key)?; add_key(key)?;
} }
Operation::Delete => { Operation::Delete => {
word_docids_sender.delete(key).unwrap(); docids_sender.delete(key).unwrap();
del_key(key)?; del_key(key)?;
} }
Operation::Ignore => (), Operation::Ignore => (),
@ -285,6 +305,76 @@ fn merge_and_send_docids<D: DatabaseType>(
Ok(()) Ok(())
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
fn merge_and_send_facet_docids(
merger: Merger<File, MergeDeladdCboRoaringBitmaps>,
database: FacetDatabases,
rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender,
) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
docids_sender.write(key, value).unwrap();
}
Operation::Delete => {
docids_sender.delete(key).unwrap();
}
Operation::Ignore => (),
}
}
Ok(())
}
struct FacetDatabases {
/// Maps the facet field id and the docids for which this field exists
facet_id_exists_docids: Database<Bytes, Bytes>,
/// Maps the facet field id and the docids for which this field is set as null
facet_id_is_null_docids: Database<Bytes, Bytes>,
/// Maps the facet field id and the docids for which this field is considered empty
facet_id_is_empty_docids: Database<Bytes, Bytes>,
/// Maps the facet field id and ranges of numbers with the docids that corresponds to them.
facet_id_f64_docids: Database<Bytes, Bytes>,
/// Maps the facet field id and ranges of strings with the docids that corresponds to them.
facet_id_string_docids: Database<Bytes, Bytes>,
}
impl FacetDatabases {
fn new(index: &Index) -> Self {
Self {
facet_id_exists_docids: index.facet_id_exists_docids.remap_types(),
facet_id_is_null_docids: index.facet_id_is_null_docids.remap_types(),
facet_id_is_empty_docids: index.facet_id_is_empty_docids.remap_types(),
facet_id_f64_docids: index.facet_id_f64_docids.remap_types(),
facet_id_string_docids: index.facet_id_string_docids.remap_types(),
}
}
fn get<'a>(&self, rtxn: &'a RoTxn<'_>, key: &[u8]) -> heed::Result<Option<&'a [u8]>> {
let (facet_kind, key) = self.extract_facet_kind(key);
match facet_kind {
FacetKind::Exists => self.facet_id_exists_docids.get(rtxn, key),
FacetKind::Null => self.facet_id_is_null_docids.get(rtxn, key),
FacetKind::Empty => self.facet_id_is_empty_docids.get(rtxn, key),
FacetKind::Number => self.facet_id_f64_docids.get(rtxn, key),
FacetKind::String => self.facet_id_string_docids.get(rtxn, key),
}
}
fn extract_facet_kind<'a>(&self, key: &'a [u8]) -> (FacetKind, &'a [u8]) {
(FacetKind::from(key[0]), &key[1..])
}
}
enum Operation { enum Operation {
Write(RoaringBitmap), Write(RoaringBitmap),
Delete, Delete,