Merge pull request #5019 from meilisearch/indexer-edition-2024-bumpalo-in-extractors

Implement facet search extraction
This commit is contained in:
Many the fish 2024-10-23 10:42:38 +02:00 committed by GitHub
commit 3d29226a7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 555 additions and 134 deletions

View File

@ -144,6 +144,8 @@ pub enum Database {
FacetIdExistsDocids, FacetIdExistsDocids,
FacetIdF64NumberDocids, FacetIdF64NumberDocids,
FacetIdStringDocids, FacetIdStringDocids,
FacetIdNormalizedStringStrings,
FacetIdStringFst,
} }
impl Database { impl Database {
@ -163,6 +165,10 @@ impl Database {
Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(), Database::FacetIdExistsDocids => index.facet_id_exists_docids.remap_types(),
Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(), Database::FacetIdF64NumberDocids => index.facet_id_f64_docids.remap_types(),
Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(), Database::FacetIdStringDocids => index.facet_id_string_docids.remap_types(),
Database::FacetIdNormalizedStringStrings => {
index.facet_id_normalized_string_strings.remap_types()
}
Database::FacetIdStringFst => index.facet_id_string_fst.remap_types(),
} }
} }
} }
@ -240,6 +246,10 @@ impl MergerSender {
DocumentsSender(self) DocumentsSender(self)
} }
pub fn facet_searchable(&self) -> FacetSearchableSender<'_> {
FacetSearchableSender { sender: self }
}
pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> { pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap( let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap(
DOCUMENTS_IDS_KEY.as_bytes(), DOCUMENTS_IDS_KEY.as_bytes(),
@ -445,6 +455,50 @@ impl DocidsSender for FacetDocidsSender<'_> {
} }
} }
pub struct FacetSearchableSender<'a> {
sender: &'a MergerSender,
}
impl FacetSearchableSender<'_> {
pub fn write_facet(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
match self
.sender
.send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry })
{
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete_facet(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self
.sender
.send(WriterOperation { database: Database::FacetIdNormalizedStringStrings, entry })
{
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn write_fst(&self, key: &[u8], value: Mmap) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Write(KeyValueEntry::from_large_key_value(key, value));
match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete_fst(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self.sender.send(WriterOperation { database: Database::FacetIdStringFst, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
pub struct DocumentsSender<'a>(&'a MergerSender); pub struct DocumentsSender<'a>(&'a MergerSender);
impl DocumentsSender<'_> { impl DocumentsSender<'_> {

View File

@ -1,9 +1,11 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::mem::size_of;
use std::num::NonZero; use std::num::NonZero;
use std::ops::DerefMut as _; use std::ops::DerefMut as _;
use bumpalo::collections::vec::Vec as BumpVec;
use bumpalo::Bump; use bumpalo::Bump;
use grenad::{Merger, MergerBuilder}; use grenad::{Merger, MergerBuilder};
use heed::RoTxn; use heed::RoTxn;
@ -118,30 +120,33 @@ impl WordDocidsCachedSorters {
word: &str, word: &str,
exact: bool, exact: bool,
docid: u32, docid: u32,
buffer: &mut Vec<u8>, bump: &Bump,
) -> Result<()> { ) -> Result<()> {
let key = word.as_bytes(); let word_bytes = word.as_bytes();
if exact { if exact {
self.exact_word_docids.insert_add_u32(key, docid)?; self.exact_word_docids.insert_add_u32(word_bytes, docid)?;
} else { } else {
self.word_docids.insert_add_u32(key, docid)?; self.word_docids.insert_add_u32(word_bytes, docid)?;
} }
let buffer_size = word_bytes.len() + 1 + size_of::<FieldId>();
let mut buffer = BumpVec::with_capacity_in(buffer_size, bump);
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word_bytes);
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&field_id.to_be_bytes()); buffer.extend_from_slice(&field_id.to_be_bytes());
self.word_fid_docids.insert_add_u32(buffer, docid)?; self.word_fid_docids.insert_add_u32(&buffer, docid)?;
let position = bucketed_position(position); let position = bucketed_position(position);
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word_bytes);
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&position.to_be_bytes()); buffer.extend_from_slice(&position.to_be_bytes());
self.word_position_docids.insert_add_u32(buffer, docid)?; self.word_position_docids.insert_add_u32(&buffer, docid)?;
if self.current_docid.map_or(false, |id| docid != id) { if self.current_docid.map_or(false, |id| docid != id) {
self.flush_fid_word_count(buffer)?; self.flush_fid_word_count(&mut buffer)?;
} }
self.fid_word_count self.fid_word_count
@ -160,30 +165,33 @@ impl WordDocidsCachedSorters {
word: &str, word: &str,
exact: bool, exact: bool,
docid: u32, docid: u32,
buffer: &mut Vec<u8>, bump: &Bump,
) -> Result<()> { ) -> Result<()> {
let key = word.as_bytes(); let word_bytes = word.as_bytes();
if exact { if exact {
self.exact_word_docids.insert_del_u32(key, docid)?; self.exact_word_docids.insert_del_u32(word_bytes, docid)?;
} else { } else {
self.word_docids.insert_del_u32(key, docid)?; self.word_docids.insert_del_u32(word_bytes, docid)?;
} }
let buffer_size = word_bytes.len() + 1 + size_of::<FieldId>();
let mut buffer = BumpVec::with_capacity_in(buffer_size, bump);
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word_bytes);
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&field_id.to_be_bytes()); buffer.extend_from_slice(&field_id.to_be_bytes());
self.word_fid_docids.insert_del_u32(buffer, docid)?; self.word_fid_docids.insert_del_u32(&buffer, docid)?;
let position = bucketed_position(position); let position = bucketed_position(position);
buffer.clear(); buffer.clear();
buffer.extend_from_slice(word.as_bytes()); buffer.extend_from_slice(word_bytes);
buffer.push(0); buffer.push(0);
buffer.extend_from_slice(&position.to_be_bytes()); buffer.extend_from_slice(&position.to_be_bytes());
self.word_position_docids.insert_del_u32(buffer, docid)?; self.word_position_docids.insert_del_u32(&buffer, docid)?;
if self.current_docid.map_or(false, |id| docid != id) { if self.current_docid.map_or(false, |id| docid != id) {
self.flush_fid_word_count(buffer)?; self.flush_fid_word_count(&mut buffer)?;
} }
self.fid_word_count self.fid_word_count
@ -195,7 +203,7 @@ impl WordDocidsCachedSorters {
Ok(()) Ok(())
} }
fn flush_fid_word_count(&mut self, buffer: &mut Vec<u8>) -> Result<()> { fn flush_fid_word_count(&mut self, buffer: &mut BumpVec<u8>) -> Result<()> {
for (fid, (current_count, new_count)) in self.fid_word_count.drain() { for (fid, (current_count, new_count)) in self.fid_word_count.drain() {
if current_count != new_count { if current_count != new_count {
if current_count <= MAX_COUNTED_WORDS { if current_count <= MAX_COUNTED_WORDS {
@ -420,11 +428,11 @@ impl WordDocidsExtractors {
let cached_sorter = cached_sorter.deref_mut(); let cached_sorter = cached_sorter.deref_mut();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let new_fields_ids_map = new_fields_ids_map.deref_mut(); let new_fields_ids_map = new_fields_ids_map.deref_mut();
let doc_alloc = &context.doc_alloc;
let exact_attributes = index.exact_attributes(rtxn)?; let exact_attributes = index.exact_attributes(rtxn)?;
let is_exact_attribute = let is_exact_attribute =
|fname: &str| exact_attributes.iter().any(|attr| contained_in(fname, attr)); |fname: &str| exact_attributes.iter().any(|attr| contained_in(fname, attr));
let mut buffer = Vec::new();
match document_change { match document_change {
DocumentChange::Deletion(inner) => { DocumentChange::Deletion(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| { let mut token_fn = |fname: &str, fid, pos, word: &str| {
@ -435,7 +443,7 @@ impl WordDocidsExtractors {
word, word,
is_exact_attribute(fname), is_exact_attribute(fname),
inner.docid(), inner.docid(),
&mut buffer, doc_alloc,
) )
.map_err(crate::Error::from) .map_err(crate::Error::from)
}; };
@ -454,7 +462,7 @@ impl WordDocidsExtractors {
word, word,
is_exact_attribute(fname), is_exact_attribute(fname),
inner.docid(), inner.docid(),
&mut buffer, doc_alloc,
) )
.map_err(crate::Error::from) .map_err(crate::Error::from)
}; };
@ -472,7 +480,7 @@ impl WordDocidsExtractors {
word, word,
is_exact_attribute(fname), is_exact_attribute(fname),
inner.docid(), inner.docid(),
&mut buffer, doc_alloc,
) )
.map_err(crate::Error::from) .map_err(crate::Error::from)
}; };
@ -491,7 +499,7 @@ impl WordDocidsExtractors {
word, word,
is_exact_attribute(fname), is_exact_attribute(fname),
inner.docid(), inner.docid(),
&mut buffer, doc_alloc,
) )
.map_err(crate::Error::from) .map_err(crate::Error::from)
}; };
@ -503,6 +511,8 @@ impl WordDocidsExtractors {
} }
} }
let buffer_size = size_of::<FieldId>();
let mut buffer = BumpVec::with_capacity_in(buffer_size, &context.doc_alloc);
cached_sorter.flush_fid_word_count(&mut buffer) cached_sorter.flush_fid_word_count(&mut buffer)
} }

View File

@ -0,0 +1,275 @@
use std::collections::{BTreeSet, HashMap};
use charabia::{normalizer::NormalizerOption, Language, Normalize, StrDetection, Token};
use grenad::Sorter;
use heed::{
types::{Bytes, SerdeJson},
BytesDecode, BytesEncode, RoTxn,
};
use crate::{
heed_codec::{
facet::{FacetGroupKey, FacetGroupKeyCodec},
StrRefCodec,
},
update::{
create_sorter,
del_add::{DelAdd, KvWriterDelAdd},
MergeDeladdBtreesetString,
},
BEU16StrCodec, FieldId, GlobalFieldsIdsMap, Index, LocalizedAttributesRule, Result,
MAX_FACET_VALUE_LENGTH,
};
use super::{
channel::FacetSearchableSender, extract::FacetKind, fst_merger_builder::FstMergerBuilder,
KvReaderDelAdd,
};
pub struct FacetSearchBuilder<'indexer> {
registered_facets: HashMap<FieldId, usize>,
normalized_facet_string_docids_sorter: Sorter<MergeDeladdBtreesetString>,
global_fields_ids_map: GlobalFieldsIdsMap<'indexer>,
localized_attributes_rules: Vec<LocalizedAttributesRule>,
// Buffered data below
buffer: Vec<u8>,
localized_field_ids: HashMap<FieldId, Option<Vec<Language>>>,
}
impl<'indexer> FacetSearchBuilder<'indexer> {
pub fn new(
global_fields_ids_map: GlobalFieldsIdsMap<'indexer>,
localized_attributes_rules: Vec<LocalizedAttributesRule>,
) -> Self {
let registered_facets = HashMap::new();
let normalized_facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdBtreesetString,
grenad::CompressionType::None,
None,
None,
Some(0),
);
Self {
registered_facets,
normalized_facet_string_docids_sorter,
buffer: Vec::new(),
global_fields_ids_map,
localized_attributes_rules,
localized_field_ids: HashMap::new(),
}
}
fn extract_key_data<'k>(&self, key: &'k [u8]) -> Result<Option<FacetGroupKey<&'k str>>> {
match FacetKind::from(key[0]) {
// Only strings are searchable
FacetKind::String => Ok(Some(
FacetGroupKeyCodec::<StrRefCodec>::bytes_decode(&key[1..])
.map_err(heed::Error::Encoding)?,
)),
_ => Ok(None),
}
}
pub fn register_from_key(&mut self, deladd: DelAdd, facet_key: &[u8]) -> Result<()> {
let Some(FacetGroupKey { field_id, level: _level, left_bound }) =
self.extract_key_data(facet_key)?
else {
return Ok(());
};
if deladd == DelAdd::Addition {
self.registered_facets.entry(field_id).and_modify(|count| *count += 1).or_insert(1);
}
let locales = self.locales(field_id);
let hyper_normalized_value = normalize_facet_string(left_bound, locales.as_deref());
let set = BTreeSet::from_iter(std::iter::once(left_bound));
// as the facet string is the same, we can put the deletion and addition in the same obkv.
self.buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut self.buffer);
let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?;
obkv.insert(deladd, val)?;
obkv.finish()?;
let key: (u16, &str) = (field_id, hyper_normalized_value.as_ref());
let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
self.normalized_facet_string_docids_sorter.insert(key_bytes, &self.buffer)?;
Ok(())
}
fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> {
if self.localized_field_ids.get(&field_id).is_none() {
let Some(field_name) = self.global_fields_ids_map.name(field_id) else {
unreachable!("Field id {} not found in the global fields ids map", field_id);
};
let locales = self
.localized_attributes_rules
.iter()
.find(|rule| rule.match_str(field_name))
.map(|rule| rule.locales.clone());
self.localized_field_ids.insert(field_id, locales);
}
self.localized_field_ids.get(&field_id).unwrap().as_deref()
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_fst")]
pub fn merge_and_send(
self,
index: &Index,
rtxn: &RoTxn<'_>,
sender: FacetSearchableSender,
) -> Result<()> {
let reader = self.normalized_facet_string_docids_sorter.into_reader_cursors()?;
let mut builder = grenad::MergerBuilder::new(MergeDeladdBtreesetString);
builder.extend(reader);
let database = index.facet_id_normalized_string_strings.remap_types::<Bytes, Bytes>();
let mut merger_iter = builder.build().into_stream_merger_iter()?;
let mut current_field_id = None;
let mut fst;
let mut fst_merger_builder: Option<FstMergerBuilder> = None;
while let Some((key, deladd)) = merger_iter.next()? {
let (field_id, normalized_facet_string) =
BEU16StrCodec::bytes_decode(&key).map_err(heed::Error::Encoding)?;
if current_field_id != Some(field_id) {
if let Some(fst_merger_builder) = fst_merger_builder {
// send the previous fst to the channel
let mmap = fst_merger_builder.build(&mut callback)?;
sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
}
println!("getting fst for field_id: {}", field_id);
fst = index.facet_id_string_fst.get(rtxn, &field_id)?;
fst_merger_builder = Some(FstMergerBuilder::new(fst.as_ref())?);
current_field_id = Some(field_id);
}
let current = database.get(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_btreesets(current, del, add)? {
Operation::Write(value) => {
match fst_merger_builder.as_mut() {
Some(fst_merger_builder) => {
fst_merger_builder.register(
DelAdd::Addition,
normalized_facet_string.as_bytes(),
&mut callback,
)?;
}
None => unreachable!(),
}
let key = (field_id, normalized_facet_string);
let key_bytes =
BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
sender.write_facet(&key_bytes, &value).unwrap();
}
Operation::Delete => {
match fst_merger_builder.as_mut() {
Some(fst_merger_builder) => {
fst_merger_builder.register(
DelAdd::Deletion,
normalized_facet_string.as_bytes(),
&mut callback,
)?;
}
None => unreachable!(),
}
let key = (field_id, normalized_facet_string);
let key_bytes =
BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
sender.delete_facet(&key_bytes).unwrap();
}
Operation::Ignore => (),
}
}
if let (Some(field_id), Some(fst_merger_builder)) = (current_field_id, fst_merger_builder) {
let mmap = fst_merger_builder.build(&mut callback)?;
sender.write_fst(&field_id.to_be_bytes(), mmap).unwrap();
}
Ok(())
}
}
fn callback(_bytes: &[u8], _deladd: DelAdd, _is_modified: bool) -> Result<()> {
Ok(())
}
fn merge_btreesets<'a>(
current: Option<&[u8]>,
del: Option<&[u8]>,
add: Option<&[u8]>,
) -> Result<Operation> {
let mut result: BTreeSet<String> = match current {
Some(current) => SerdeJson::bytes_decode(current).map_err(heed::Error::Encoding)?,
None => BTreeSet::new(),
};
if let Some(del) = del {
let del: BTreeSet<String> = SerdeJson::bytes_decode(del).map_err(heed::Error::Encoding)?;
result = result.difference(&del).cloned().collect();
}
if let Some(add) = add {
let add: BTreeSet<String> = SerdeJson::bytes_decode(add).map_err(heed::Error::Encoding)?;
result.extend(add);
}
/// TODO remove allocation
let result = SerdeJson::bytes_encode(&result).map_err(heed::Error::Encoding)?.into_owned();
if Some(result.as_ref()) == current {
Ok(Operation::Ignore)
} else if result.is_empty() {
Ok(Operation::Delete)
} else {
Ok(Operation::Write(result))
}
}
/// Normalizes the facet string and truncates it to the max length.
fn normalize_facet_string(facet_string: &str, locales: Option<&[Language]>) -> String {
let options: NormalizerOption = NormalizerOption { lossy: true, ..Default::default() };
let mut detection = StrDetection::new(facet_string, locales);
let script = detection.script();
// Detect the language of the facet string only if several locales are explicitly provided.
let language = match locales {
Some(&[language]) => Some(language),
Some(multiple_locales) if multiple_locales.len() > 1 => detection.language(),
_ => None,
};
let token = Token {
lemma: std::borrow::Cow::Borrowed(facet_string),
script,
language,
..Default::default()
};
// truncate the facet string to the max length
token
.normalize(&options)
.lemma
.char_indices()
.take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
.map(|(_, c)| c)
.collect()
}
enum Operation {
Write(Vec<u8>),
Delete,
Ignore,
}

View File

@ -0,0 +1,155 @@
use std::{fs::File, io::BufWriter};
use fst::{Set, SetBuilder, Streamer};
use memmap2::Mmap;
use tempfile::tempfile;
use crate::{update::del_add::DelAdd, InternalError, Result};
pub struct FstMergerBuilder<'a> {
stream: Option<fst::set::Stream<'a>>,
fst_builder: SetBuilder<BufWriter<File>>,
last: Option<Vec<u8>>,
inserted_words: usize,
}
impl<'a> FstMergerBuilder<'a> {
pub fn new<D: AsRef<[u8]>>(fst: Option<&'a Set<D>>) -> Result<Self> {
Ok(Self {
stream: fst.map(|fst| fst.stream()),
fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?,
last: None,
inserted_words: 0,
})
}
pub fn register(
&mut self,
deladd: DelAdd,
right: &[u8],
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
) -> Result<()> {
if let Some(left) = self.last.take() {
let (left_inserted, right_inserted) =
self.compare_and_insert(deladd, left.as_slice(), right, insertion_callback)?;
// left was not inserted, so we keep it for the next iteration
if !left_inserted {
self.last = Some(left);
}
// right was inserted, so we can stop
if right_inserted {
return Ok(());
}
}
if let Some(mut stream) = self.stream.take() {
while let Some(left) = stream.next() {
let (left_inserted, right_inserted) =
self.compare_and_insert(deladd, left, right, insertion_callback)?;
// left was not inserted, so we keep it for the next iteration
if !left_inserted {
self.last = Some(left.to_vec());
}
// right was inserted, so we can stop
if right_inserted {
self.stream = Some(stream);
return Ok(());
}
}
}
// If we reach this point, it means that the stream is empty
// and we need to insert the incoming word
self.insert(right, deladd, true, insertion_callback)?;
Ok(())
}
fn compare_and_insert(
&mut self,
deladd: DelAdd,
left: &[u8],
right: &[u8],
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
) -> Result<(bool, bool)> {
let mut left_inserted = false;
let mut right_inserted = false;
match left.cmp(right) {
std::cmp::Ordering::Less => {
// We need to insert the last word from the current fst
self.insert(left, DelAdd::Addition, false, insertion_callback)?;
left_inserted = true;
}
std::cmp::Ordering::Equal => {
self.insert(right, deladd, true, insertion_callback)?;
left_inserted = true;
right_inserted = true;
}
std::cmp::Ordering::Greater => {
self.insert(right, deladd, true, insertion_callback)?;
right_inserted = true;
}
}
Ok((left_inserted, right_inserted))
}
fn insert(
&mut self,
bytes: &[u8],
deladd: DelAdd,
is_modified: bool,
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
) -> Result<()> {
// Addition: We insert the word
// Deletion: We delete the word by not inserting it
if deladd == DelAdd::Addition {
self.inserted_words += 1;
self.fst_builder.insert(bytes)?;
}
insertion_callback(bytes, deladd, is_modified)?;
Ok(())
}
fn drain_stream(
&mut self,
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
) -> Result<()> {
if let Some(last) = self.last.take() {
self.insert(last.as_slice(), DelAdd::Addition, false, insertion_callback)?;
}
if let Some(mut stream) = self.stream.take() {
while let Some(current) = stream.next() {
self.insert(current, DelAdd::Addition, false, insertion_callback)?;
}
}
Ok(())
}
pub fn build(
mut self,
insertion_callback: &mut impl FnMut(&[u8], DelAdd, bool) -> Result<()>,
) -> Result<Mmap> {
self.drain_stream(insertion_callback)?;
let fst_file = self
.fst_builder
.into_inner()?
.into_inner()
.map_err(|_| InternalError::IndexingMergingKeys { process: "building-fst" })?;
let fst_mmap = unsafe { Mmap::map(&fst_file)? };
Ok(fst_mmap)
}
}

View File

@ -10,13 +10,17 @@ use roaring::RoaringBitmap;
use super::channel::*; use super::channel::*;
use super::extract::FacetKind; use super::extract::FacetKind;
use super::facet_search_builder::FacetSearchBuilder;
use super::word_fst_builder::{PrefixData, PrefixDelta}; use super::word_fst_builder::{PrefixData, PrefixDelta};
use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId}; use super::{Deletion, DocumentChange, KvReaderDelAdd, KvReaderFieldId};
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation; use crate::update::new::channel::MergerOperation;
use crate::update::new::word_fst_builder::WordFstBuilder; use crate::update::new::word_fst_builder::WordFstBuilder;
use crate::update::MergeDeladdCboRoaringBitmaps; use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Result}; use crate::{
localized_attributes_rules, CboRoaringBitmapCodec, Error, FieldId, GeoPoint,
GlobalFieldsIdsMap, Index, Result,
};
/// TODO We must return some infos/stats /// TODO We must return some infos/stats
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
@ -170,6 +174,12 @@ pub fn merge_grenad_entries(
tracing::trace_span!(target: "indexing::documents::merge", "facet_docids"); tracing::trace_span!(target: "indexing::documents::merge", "facet_docids");
let _entered = span.enter(); let _entered = span.enter();
let mut facet_field_ids_delta = FacetFieldIdsDelta::new(); let mut facet_field_ids_delta = FacetFieldIdsDelta::new();
let localized_attributes_rules =
index.localized_attributes_rules(rtxn)?.unwrap_or_default();
let mut facet_search_builder = FacetSearchBuilder::new(
global_fields_ids_map.clone(),
localized_attributes_rules,
);
merge_and_send_facet_docids( merge_and_send_facet_docids(
merger, merger,
FacetDatabases::new(index), FacetDatabases::new(index),
@ -177,9 +187,12 @@ pub fn merge_grenad_entries(
&mut buffer, &mut buffer,
sender.facet_docids(), sender.facet_docids(),
&mut facet_field_ids_delta, &mut facet_field_ids_delta,
&mut facet_search_builder,
)?; )?;
merger_result.facet_field_ids_delta = Some(facet_field_ids_delta); merger_result.facet_field_ids_delta = Some(facet_field_ids_delta);
// merge and send the facet fst and the searchable facet values
facet_search_builder.merge_and_send(index, rtxn, sender.facet_searchable())?;
} }
} }
} }
@ -294,6 +307,7 @@ fn merge_and_send_facet_docids(
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender, docids_sender: impl DocidsSender,
facet_field_ids_delta: &mut FacetFieldIdsDelta, facet_field_ids_delta: &mut FacetFieldIdsDelta,
facet_search_builder: &mut FacetSearchBuilder,
) -> Result<()> { ) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap(); let mut merger_iter = merger.into_stream_merger_iter().unwrap();
while let Some((key, deladd)) = merger_iter.next().unwrap() { while let Some((key, deladd)) = merger_iter.next().unwrap() {
@ -305,11 +319,13 @@ fn merge_and_send_facet_docids(
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key); facet_field_ids_delta.register_from_key(key);
facet_search_builder.register_from_key(DelAdd::Addition, key)?;
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
docids_sender.write(key, value).unwrap(); docids_sender.write(key, value).unwrap();
} }
Operation::Delete => { Operation::Delete => {
facet_field_ids_delta.register_from_key(key); facet_field_ids_delta.register_from_key(key);
facet_search_builder.register_from_key(DelAdd::Deletion, key)?;
docids_sender.delete(key).unwrap(); docids_sender.delete(key).unwrap();
} }
Operation::Ignore => (), Operation::Ignore => (),

View File

@ -8,6 +8,8 @@ mod channel;
pub mod document; pub mod document;
mod document_change; mod document_change;
mod extract; mod extract;
mod facet_search_builder;
mod fst_merger_builder;
pub mod indexer; pub mod indexer;
mod merger; mod merger;
mod parallel_iterator_ext; mod parallel_iterator_ext;

View File

@ -1,4 +1,4 @@
use std::{fs::File, io::BufWriter}; use std::io::BufWriter;
use fst::{Set, SetBuilder, Streamer}; use fst::{Set, SetBuilder, Streamer};
use memmap2::Mmap; use memmap2::Mmap;
@ -7,23 +7,19 @@ use tempfile::tempfile;
use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result}; use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result};
use super::fst_merger_builder::FstMergerBuilder;
pub struct WordFstBuilder<'a> { pub struct WordFstBuilder<'a> {
stream: Option<fst::set::Stream<'a>>, word_fst_builder: FstMergerBuilder<'a>,
word_fst_builder: SetBuilder<BufWriter<File>>,
last_word: Option<Vec<u8>>,
prefix_fst_builder: Option<PrefixFstBuilder>, prefix_fst_builder: Option<PrefixFstBuilder>,
inserted_words: usize,
registered_words: usize, registered_words: usize,
} }
impl<'a> WordFstBuilder<'a> { impl<'a> WordFstBuilder<'a> {
pub fn new(words_fst: &'a Set<std::borrow::Cow<'a, [u8]>>) -> Result<Self> { pub fn new(words_fst: &'a Set<std::borrow::Cow<'a, [u8]>>) -> Result<Self> {
Ok(Self { Ok(Self {
stream: Some(words_fst.stream()), word_fst_builder: FstMergerBuilder::new(Some(words_fst))?,
word_fst_builder: SetBuilder::new(BufWriter::new(tempfile()?))?,
prefix_fst_builder: None, prefix_fst_builder: None,
last_word: None,
inserted_words: 0,
registered_words: 0, registered_words: 0,
}) })
} }
@ -38,100 +34,13 @@ impl<'a> WordFstBuilder<'a> {
self.registered_words += 1; self.registered_words += 1;
} }
if let Some(left) = self.last_word.take() { self.word_fst_builder.register(deladd, right, &mut |bytes, deladd, is_modified| {
let (left_inserted, right_inserted) = if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder {
self.compare_and_insert(deladd, left.as_slice(), right)?; prefix_fst_builder.insert_word(bytes, deladd, is_modified)
} else {
// left was not inserted, so we keep it for the next iteration
if !left_inserted {
self.last_word = Some(left);
}
// right was inserted, so we can stop
if right_inserted {
return Ok(());
}
}
if let Some(mut stream) = self.stream.take() {
while let Some(left) = stream.next() {
let (left_inserted, right_inserted) =
self.compare_and_insert(deladd, left, right)?;
// left was not inserted, so we keep it for the next iteration
if !left_inserted {
self.last_word = Some(left.to_vec());
}
// right was inserted, so we can stop
if right_inserted {
self.stream = Some(stream);
return Ok(());
}
}
// If we reach this point, it means that the stream is empty
// and we need to insert the incoming word
self.insert_word(right, deladd, true)?;
self.stream = Some(stream);
}
Ok(()) Ok(())
} }
})?;
pub fn compare_and_insert(
&mut self,
deladd: DelAdd,
left: &[u8],
right: &[u8],
) -> Result<(bool, bool)> {
let mut left_inserted = false;
let mut right_inserted = false;
match left.cmp(right) {
std::cmp::Ordering::Less => {
// We need to insert the last word from the current fst
self.insert_word(left, DelAdd::Addition, false)?;
left_inserted = true;
}
std::cmp::Ordering::Equal => {
self.insert_word(right, deladd, true)?;
left_inserted = true;
right_inserted = true;
}
std::cmp::Ordering::Greater => {
self.insert_word(right, deladd, true)?;
right_inserted = true;
}
}
Ok((left_inserted, right_inserted))
}
fn insert_word(&mut self, bytes: &[u8], deladd: DelAdd, is_modified: bool) -> Result<()> {
// Addition: We insert the word
// Deletion: We delete the word by not inserting it
if deladd == DelAdd::Addition {
self.inserted_words += 1;
self.word_fst_builder.insert(bytes)?;
}
if let Some(prefix_fst_builder) = self.prefix_fst_builder.as_mut() {
prefix_fst_builder.insert_word(bytes, deladd, is_modified)?;
}
Ok(())
}
fn drain_stream(&mut self) -> Result<()> {
if let Some(mut stream) = self.stream.take() {
while let Some(current) = stream.next() {
self.insert_word(current, DelAdd::Addition, false)?;
}
}
Ok(()) Ok(())
} }
@ -141,13 +50,13 @@ impl<'a> WordFstBuilder<'a> {
index: &crate::Index, index: &crate::Index,
rtxn: &heed::RoTxn, rtxn: &heed::RoTxn,
) -> Result<(Mmap, Option<PrefixData>)> { ) -> Result<(Mmap, Option<PrefixData>)> {
self.drain_stream()?; let words_fst_mmap = self.word_fst_builder.build(&mut |bytes, deladd, is_modified| {
if let Some(prefix_fst_builder) = &mut self.prefix_fst_builder {
let words_fst_file = prefix_fst_builder.insert_word(bytes, deladd, is_modified)
self.word_fst_builder.into_inner()?.into_inner().map_err(|_| { } else {
InternalError::IndexingMergingKeys { process: "building-words-fst" } Ok(())
}
})?; })?;
let words_fst_mmap = unsafe { Mmap::map(&words_fst_file)? };
let prefix_data = self let prefix_data = self
.prefix_fst_builder .prefix_fst_builder