mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-11 22:14:32 +01:00
Store the first word positions levels
This commit is contained in:
parent
b0a417f342
commit
9242f2f1d4
@ -52,6 +52,10 @@ pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -
|
|||||||
cbo_roaring_bitmap_merge(values)
|
cbo_roaring_bitmap_merge(values)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn word_level_position_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
|
||||||
|
cbo_roaring_bitmap_merge(values)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn facet_field_value_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
|
pub fn facet_field_value_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
|
||||||
cbo_roaring_bitmap_merge(values)
|
cbo_roaring_bitmap_merge(values)
|
||||||
}
|
}
|
||||||
|
@ -18,11 +18,12 @@ use rayon::prelude::*;
|
|||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
use crate::index::Index;
|
use crate::index::Index;
|
||||||
use crate::update::{Facets, WordsPrefixes, UpdateIndexingStep};
|
use crate::update::{Facets, WordsLevelPositions, WordsPrefixes, UpdateIndexingStep};
|
||||||
use self::store::{Store, Readers};
|
use self::store::{Store, Readers};
|
||||||
pub use self::merge_function::{
|
pub use self::merge_function::{
|
||||||
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
|
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
|
||||||
docid_word_positions_merge, documents_merge, facet_field_value_docids_merge,
|
docid_word_positions_merge, documents_merge,
|
||||||
|
word_level_position_docids_merge, facet_field_value_docids_merge,
|
||||||
field_id_docid_facet_values_merge,
|
field_id_docid_facet_values_merge,
|
||||||
};
|
};
|
||||||
pub use self::transform::{Transform, TransformOutput};
|
pub use self::transform::{Transform, TransformOutput};
|
||||||
@ -402,6 +403,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
enum DatabaseType {
|
enum DatabaseType {
|
||||||
Main,
|
Main,
|
||||||
WordDocids,
|
WordDocids,
|
||||||
|
WordLevel0PositionDocids,
|
||||||
FacetLevel0ValuesDocids,
|
FacetLevel0ValuesDocids,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -467,6 +469,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
let mut word_docids_readers = Vec::with_capacity(readers.len());
|
let mut word_docids_readers = Vec::with_capacity(readers.len());
|
||||||
let mut docid_word_positions_readers = Vec::with_capacity(readers.len());
|
let mut docid_word_positions_readers = Vec::with_capacity(readers.len());
|
||||||
let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len());
|
let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len());
|
||||||
|
let mut word_level_position_docids_readers = Vec::with_capacity(readers.len());
|
||||||
let mut facet_field_value_docids_readers = Vec::with_capacity(readers.len());
|
let mut facet_field_value_docids_readers = Vec::with_capacity(readers.len());
|
||||||
let mut field_id_docid_facet_values_readers = Vec::with_capacity(readers.len());
|
let mut field_id_docid_facet_values_readers = Vec::with_capacity(readers.len());
|
||||||
let mut documents_readers = Vec::with_capacity(readers.len());
|
let mut documents_readers = Vec::with_capacity(readers.len());
|
||||||
@ -476,6 +479,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
word_docids,
|
word_docids,
|
||||||
docid_word_positions,
|
docid_word_positions,
|
||||||
words_pairs_proximities_docids,
|
words_pairs_proximities_docids,
|
||||||
|
word_level_position_docids,
|
||||||
facet_field_value_docids,
|
facet_field_value_docids,
|
||||||
field_id_docid_facet_values,
|
field_id_docid_facet_values,
|
||||||
documents
|
documents
|
||||||
@ -484,6 +488,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
word_docids_readers.push(word_docids);
|
word_docids_readers.push(word_docids);
|
||||||
docid_word_positions_readers.push(docid_word_positions);
|
docid_word_positions_readers.push(docid_word_positions);
|
||||||
words_pairs_proximities_docids_readers.push(words_pairs_proximities_docids);
|
words_pairs_proximities_docids_readers.push(words_pairs_proximities_docids);
|
||||||
|
word_level_position_docids_readers.push(word_level_position_docids);
|
||||||
facet_field_value_docids_readers.push(facet_field_value_docids);
|
facet_field_value_docids_readers.push(facet_field_value_docids);
|
||||||
field_id_docid_facet_values_readers.push(field_id_docid_facet_values);
|
field_id_docid_facet_values_readers.push(field_id_docid_facet_values);
|
||||||
documents_readers.push(documents);
|
documents_readers.push(documents);
|
||||||
@ -514,6 +519,11 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
facet_field_value_docids_readers,
|
facet_field_value_docids_readers,
|
||||||
facet_field_value_docids_merge,
|
facet_field_value_docids_merge,
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
DatabaseType::WordLevel0PositionDocids,
|
||||||
|
word_level_position_docids_readers,
|
||||||
|
word_level_position_docids_merge,
|
||||||
|
),
|
||||||
]
|
]
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.for_each(|(dbtype, readers, merge)| {
|
.for_each(|(dbtype, readers, merge)| {
|
||||||
@ -569,7 +579,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
self.index.put_documents_ids(self.wtxn, &documents_ids)?;
|
self.index.put_documents_ids(self.wtxn, &documents_ids)?;
|
||||||
|
|
||||||
let mut database_count = 0;
|
let mut database_count = 0;
|
||||||
let total_databases = 7;
|
let total_databases = 8;
|
||||||
|
|
||||||
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
|
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
|
||||||
databases_seen: 0,
|
databases_seen: 0,
|
||||||
@ -661,7 +671,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
)?;
|
)?;
|
||||||
},
|
},
|
||||||
DatabaseType::FacetLevel0ValuesDocids => {
|
DatabaseType::FacetLevel0ValuesDocids => {
|
||||||
debug!("Writing the facet values docids into LMDB on disk...");
|
debug!("Writing the facet level 0 values docids into LMDB on disk...");
|
||||||
let db = *self.index.facet_field_id_value_docids.as_polymorph();
|
let db = *self.index.facet_field_id_value_docids.as_polymorph();
|
||||||
write_into_lmdb_database(
|
write_into_lmdb_database(
|
||||||
self.wtxn,
|
self.wtxn,
|
||||||
@ -671,6 +681,17 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
write_method,
|
write_method,
|
||||||
)?;
|
)?;
|
||||||
},
|
},
|
||||||
|
DatabaseType::WordLevel0PositionDocids => {
|
||||||
|
debug!("Writing the word level 0 positions docids into LMDB on disk...");
|
||||||
|
let db = *self.index.word_level_position_docids.as_polymorph();
|
||||||
|
write_into_lmdb_database(
|
||||||
|
self.wtxn,
|
||||||
|
db,
|
||||||
|
content,
|
||||||
|
word_level_position_docids_merge,
|
||||||
|
write_method,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
database_count += 1;
|
database_count += 1;
|
||||||
@ -693,6 +714,19 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
|||||||
}
|
}
|
||||||
builder.execute()?;
|
builder.execute()?;
|
||||||
|
|
||||||
|
// Run the words positions update operation.
|
||||||
|
let mut builder = WordsLevelPositions::new(self.wtxn, self.index, self.update_id);
|
||||||
|
builder.chunk_compression_type = self.chunk_compression_type;
|
||||||
|
builder.chunk_compression_level = self.chunk_compression_level;
|
||||||
|
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
|
||||||
|
if let Some(value) = self.facet_level_group_size {
|
||||||
|
builder.level_group_size(value);
|
||||||
|
}
|
||||||
|
if let Some(value) = self.facet_min_level_size {
|
||||||
|
builder.min_level_size(value);
|
||||||
|
}
|
||||||
|
builder.execute()?;
|
||||||
|
|
||||||
// Run the words prefixes update operation.
|
// Run the words prefixes update operation.
|
||||||
let mut builder = WordsPrefixes::new(self.wtxn, self.index, self.update_id);
|
let mut builder = WordsPrefixes::new(self.wtxn, self.index, self.update_id);
|
||||||
builder.chunk_compression_type = self.chunk_compression_type;
|
builder.chunk_compression_type = self.chunk_compression_type;
|
||||||
|
@ -29,7 +29,8 @@ use crate::{json_to_string, SmallVec8, SmallVec32, Position, DocumentId, FieldId
|
|||||||
use super::{MergeFn, create_writer, create_sorter, writer_into_reader};
|
use super::{MergeFn, create_writer, create_sorter, writer_into_reader};
|
||||||
use super::merge_function::{
|
use super::merge_function::{
|
||||||
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
|
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
|
||||||
facet_field_value_docids_merge, field_id_docid_facet_values_merge,
|
word_level_position_docids_merge, facet_field_value_docids_merge,
|
||||||
|
field_id_docid_facet_values_merge,
|
||||||
};
|
};
|
||||||
|
|
||||||
const LMDB_MAX_KEY_LENGTH: usize = 511;
|
const LMDB_MAX_KEY_LENGTH: usize = 511;
|
||||||
@ -43,6 +44,7 @@ pub struct Readers {
|
|||||||
pub word_docids: Reader<FileFuse>,
|
pub word_docids: Reader<FileFuse>,
|
||||||
pub docid_word_positions: Reader<FileFuse>,
|
pub docid_word_positions: Reader<FileFuse>,
|
||||||
pub words_pairs_proximities_docids: Reader<FileFuse>,
|
pub words_pairs_proximities_docids: Reader<FileFuse>,
|
||||||
|
pub word_level_position_docids: Reader<FileFuse>,
|
||||||
pub facet_field_value_docids: Reader<FileFuse>,
|
pub facet_field_value_docids: Reader<FileFuse>,
|
||||||
pub field_id_docid_facet_values: Reader<FileFuse>,
|
pub field_id_docid_facet_values: Reader<FileFuse>,
|
||||||
pub documents: Reader<FileFuse>,
|
pub documents: Reader<FileFuse>,
|
||||||
@ -69,6 +71,7 @@ pub struct Store<'s, A> {
|
|||||||
main_sorter: Sorter<MergeFn>,
|
main_sorter: Sorter<MergeFn>,
|
||||||
word_docids_sorter: Sorter<MergeFn>,
|
word_docids_sorter: Sorter<MergeFn>,
|
||||||
words_pairs_proximities_docids_sorter: Sorter<MergeFn>,
|
words_pairs_proximities_docids_sorter: Sorter<MergeFn>,
|
||||||
|
word_level_position_docids_sorter: Sorter<MergeFn>,
|
||||||
facet_field_value_docids_sorter: Sorter<MergeFn>,
|
facet_field_value_docids_sorter: Sorter<MergeFn>,
|
||||||
field_id_docid_facet_values_sorter: Sorter<MergeFn>,
|
field_id_docid_facet_values_sorter: Sorter<MergeFn>,
|
||||||
// MTBL writers
|
// MTBL writers
|
||||||
@ -94,7 +97,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
|
|||||||
) -> anyhow::Result<Self>
|
) -> anyhow::Result<Self>
|
||||||
{
|
{
|
||||||
// We divide the max memory by the number of sorter the Store have.
|
// We divide the max memory by the number of sorter the Store have.
|
||||||
let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 4));
|
let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 5));
|
||||||
let linked_hash_map_size = linked_hash_map_size.unwrap_or(500);
|
let linked_hash_map_size = linked_hash_map_size.unwrap_or(500);
|
||||||
|
|
||||||
let main_sorter = create_sorter(
|
let main_sorter = create_sorter(
|
||||||
@ -121,6 +124,14 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
|
|||||||
max_nb_chunks,
|
max_nb_chunks,
|
||||||
max_memory,
|
max_memory,
|
||||||
);
|
);
|
||||||
|
let word_level_position_docids_sorter = create_sorter(
|
||||||
|
word_level_position_docids_merge,
|
||||||
|
chunk_compression_type,
|
||||||
|
chunk_compression_level,
|
||||||
|
chunk_fusing_shrink_size,
|
||||||
|
max_nb_chunks,
|
||||||
|
max_memory,
|
||||||
|
);
|
||||||
let facet_field_value_docids_sorter = create_sorter(
|
let facet_field_value_docids_sorter = create_sorter(
|
||||||
facet_field_value_docids_merge,
|
facet_field_value_docids_merge,
|
||||||
chunk_compression_type,
|
chunk_compression_type,
|
||||||
@ -172,6 +183,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
|
|||||||
main_sorter,
|
main_sorter,
|
||||||
word_docids_sorter,
|
word_docids_sorter,
|
||||||
words_pairs_proximities_docids_sorter,
|
words_pairs_proximities_docids_sorter,
|
||||||
|
word_level_position_docids_sorter,
|
||||||
facet_field_value_docids_sorter,
|
facet_field_value_docids_sorter,
|
||||||
field_id_docid_facet_values_sorter,
|
field_id_docid_facet_values_sorter,
|
||||||
// MTBL writers
|
// MTBL writers
|
||||||
@ -290,6 +302,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
|
|||||||
|
|
||||||
self.documents_writer.insert(document_id.to_be_bytes(), record)?;
|
self.documents_writer.insert(document_id.to_be_bytes(), record)?;
|
||||||
Self::write_docid_word_positions(&mut self.docid_word_positions_writer, document_id, words_positions)?;
|
Self::write_docid_word_positions(&mut self.docid_word_positions_writer, document_id, words_positions)?;
|
||||||
|
Self::write_word_position_docids(&mut self.word_level_position_docids_sorter, document_id, words_positions)?;
|
||||||
|
|
||||||
words_positions.clear();
|
words_positions.clear();
|
||||||
|
|
||||||
@ -360,6 +373,42 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn write_word_position_docids(
|
||||||
|
writer: &mut Sorter<MergeFn>,
|
||||||
|
document_id: DocumentId,
|
||||||
|
words_positions: &HashMap<String, SmallVec32<Position>>,
|
||||||
|
) -> anyhow::Result<()>
|
||||||
|
{
|
||||||
|
let mut key_buffer = Vec::new();
|
||||||
|
let mut data_buffer = Vec::new();
|
||||||
|
|
||||||
|
for (word, positions) in words_positions {
|
||||||
|
key_buffer.clear();
|
||||||
|
key_buffer.extend_from_slice(word.as_bytes());
|
||||||
|
key_buffer.push(0); // level 0
|
||||||
|
|
||||||
|
for position in positions {
|
||||||
|
key_buffer.truncate(word.len());
|
||||||
|
let position_bytes = position.to_be_bytes();
|
||||||
|
key_buffer.extend_from_slice(position_bytes.as_bytes());
|
||||||
|
key_buffer.extend_from_slice(position_bytes.as_bytes());
|
||||||
|
|
||||||
|
data_buffer.clear();
|
||||||
|
let positions = RoaringBitmap::from_iter(Some(document_id));
|
||||||
|
// We serialize the positions into a buffer.
|
||||||
|
CboRoaringBitmapCodec::serialize_into(&positions, &mut data_buffer)
|
||||||
|
.with_context(|| "could not serialize positions")?;
|
||||||
|
|
||||||
|
// that we write under the generated key into MTBL
|
||||||
|
if lmdb_key_valid_size(&key_buffer) {
|
||||||
|
writer.insert(&key_buffer, &data_buffer)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn write_facet_field_value_docids<I>(
|
fn write_facet_field_value_docids<I>(
|
||||||
sorter: &mut Sorter<MergeFn>,
|
sorter: &mut Sorter<MergeFn>,
|
||||||
iter: I,
|
iter: I,
|
||||||
@ -561,6 +610,9 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
|
|||||||
let mut words_pairs_proximities_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?;
|
let mut words_pairs_proximities_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?;
|
||||||
self.words_pairs_proximities_docids_sorter.write_into(&mut words_pairs_proximities_docids_wtr)?;
|
self.words_pairs_proximities_docids_sorter.write_into(&mut words_pairs_proximities_docids_wtr)?;
|
||||||
|
|
||||||
|
let mut word_level_position_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?;
|
||||||
|
self.word_level_position_docids_sorter.write_into(&mut word_level_position_docids_wtr)?;
|
||||||
|
|
||||||
let mut facet_field_value_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?;
|
let mut facet_field_value_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?;
|
||||||
self.facet_field_value_docids_sorter.write_into(&mut facet_field_value_docids_wtr)?;
|
self.facet_field_value_docids_sorter.write_into(&mut facet_field_value_docids_wtr)?;
|
||||||
|
|
||||||
@ -570,6 +622,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
|
|||||||
let main = writer_into_reader(main_wtr, shrink_size)?;
|
let main = writer_into_reader(main_wtr, shrink_size)?;
|
||||||
let word_docids = writer_into_reader(word_docids_wtr, shrink_size)?;
|
let word_docids = writer_into_reader(word_docids_wtr, shrink_size)?;
|
||||||
let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr, shrink_size)?;
|
let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr, shrink_size)?;
|
||||||
|
let word_level_position_docids = writer_into_reader(word_level_position_docids_wtr, shrink_size)?;
|
||||||
let facet_field_value_docids = writer_into_reader(facet_field_value_docids_wtr, shrink_size)?;
|
let facet_field_value_docids = writer_into_reader(facet_field_value_docids_wtr, shrink_size)?;
|
||||||
let field_id_docid_facet_values = writer_into_reader(field_id_docid_facet_values_wtr, shrink_size)?;
|
let field_id_docid_facet_values = writer_into_reader(field_id_docid_facet_values_wtr, shrink_size)?;
|
||||||
let docid_word_positions = writer_into_reader(self.docid_word_positions_writer, shrink_size)?;
|
let docid_word_positions = writer_into_reader(self.docid_word_positions_writer, shrink_size)?;
|
||||||
@ -580,6 +633,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
|
|||||||
word_docids,
|
word_docids,
|
||||||
docid_word_positions,
|
docid_word_positions,
|
||||||
words_pairs_proximities_docids,
|
words_pairs_proximities_docids,
|
||||||
|
word_level_position_docids,
|
||||||
facet_field_value_docids,
|
facet_field_value_docids,
|
||||||
field_id_docid_facet_values,
|
field_id_docid_facet_values,
|
||||||
documents,
|
documents,
|
||||||
|
@ -6,6 +6,7 @@ pub use self::index_documents::{DocumentAdditionResult, IndexDocuments, IndexDoc
|
|||||||
pub use self::settings::{Setting, Settings};
|
pub use self::settings::{Setting, Settings};
|
||||||
pub use self::update_builder::UpdateBuilder;
|
pub use self::update_builder::UpdateBuilder;
|
||||||
pub use self::update_step::UpdateIndexingStep;
|
pub use self::update_step::UpdateIndexingStep;
|
||||||
|
pub use self::words_level_positions::WordsLevelPositions;
|
||||||
pub use self::words_prefixes::WordsPrefixes;
|
pub use self::words_prefixes::WordsPrefixes;
|
||||||
|
|
||||||
mod available_documents_ids;
|
mod available_documents_ids;
|
||||||
@ -16,5 +17,6 @@ mod index_documents;
|
|||||||
mod settings;
|
mod settings;
|
||||||
mod update_builder;
|
mod update_builder;
|
||||||
mod update_step;
|
mod update_step;
|
||||||
|
mod words_level_positions;
|
||||||
mod words_prefixes;
|
mod words_prefixes;
|
||||||
|
|
||||||
|
184
milli/src/update/words_level_positions.rs
Normal file
184
milli/src/update/words_level_positions.rs
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
use std::cmp;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::num::NonZeroUsize;
|
||||||
|
|
||||||
|
use grenad::{CompressionType, Reader, Writer, FileFuse};
|
||||||
|
use heed::types::{ByteSlice, DecodeIgnore};
|
||||||
|
use heed::{BytesEncode, Error};
|
||||||
|
use log::debug;
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
|
use crate::facet::FacetType;
|
||||||
|
use crate::heed_codec::{StrLevelPositionCodec, CboRoaringBitmapCodec};
|
||||||
|
use crate::Index;
|
||||||
|
use crate::update::index_documents::WriteMethod;
|
||||||
|
use crate::update::index_documents::{create_writer, writer_into_reader, write_into_lmdb_database};
|
||||||
|
|
||||||
|
pub struct WordsLevelPositions<'t, 'u, 'i> {
|
||||||
|
wtxn: &'t mut heed::RwTxn<'i, 'u>,
|
||||||
|
index: &'i Index,
|
||||||
|
pub(crate) chunk_compression_type: CompressionType,
|
||||||
|
pub(crate) chunk_compression_level: Option<u32>,
|
||||||
|
pub(crate) chunk_fusing_shrink_size: Option<u64>,
|
||||||
|
level_group_size: NonZeroUsize,
|
||||||
|
min_level_size: NonZeroUsize,
|
||||||
|
_update_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> {
|
||||||
|
pub fn new(
|
||||||
|
wtxn: &'t mut heed::RwTxn<'i, 'u>,
|
||||||
|
index: &'i Index,
|
||||||
|
update_id: u64,
|
||||||
|
) -> WordsLevelPositions<'t, 'u, 'i>
|
||||||
|
{
|
||||||
|
WordsLevelPositions {
|
||||||
|
wtxn,
|
||||||
|
index,
|
||||||
|
chunk_compression_type: CompressionType::None,
|
||||||
|
chunk_compression_level: None,
|
||||||
|
chunk_fusing_shrink_size: None,
|
||||||
|
level_group_size: NonZeroUsize::new(4).unwrap(),
|
||||||
|
min_level_size: NonZeroUsize::new(5).unwrap(),
|
||||||
|
_update_id: update_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn level_group_size(&mut self, value: NonZeroUsize) -> &mut Self {
|
||||||
|
self.level_group_size = NonZeroUsize::new(cmp::max(value.get(), 2)).unwrap();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn min_level_size(&mut self, value: NonZeroUsize) -> &mut Self {
|
||||||
|
self.min_level_size = value;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn execute(self) -> anyhow::Result<()> {
|
||||||
|
debug!("Computing and writing the word levels positions docids into LMDB on disk...");
|
||||||
|
|
||||||
|
clear_non_zero_levels_positions(self.wtxn, self.index.word_level_position_docids)?;
|
||||||
|
|
||||||
|
let entries = compute_positions_levels(
|
||||||
|
self.wtxn,
|
||||||
|
self.index.word_level_position_docids,
|
||||||
|
self.chunk_compression_type,
|
||||||
|
self.chunk_compression_level,
|
||||||
|
self.chunk_fusing_shrink_size,
|
||||||
|
self.level_group_size,
|
||||||
|
self.min_level_size,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
write_into_lmdb_database(
|
||||||
|
self.wtxn,
|
||||||
|
*self.index.facet_field_id_value_docids.as_polymorph(),
|
||||||
|
entries,
|
||||||
|
|_, _| anyhow::bail!("invalid facet level merging"),
|
||||||
|
WriteMethod::GetMergePut,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear_non_zero_levels_positions(
|
||||||
|
wtxn: &mut heed::RwTxn,
|
||||||
|
db: heed::Database<StrLevelPositionCodec, CboRoaringBitmapCodec>,
|
||||||
|
) -> heed::Result<()>
|
||||||
|
{
|
||||||
|
let mut iter = db.iter_mut(wtxn)?.lazily_decode_data();
|
||||||
|
while let Some(result) = iter.next() {
|
||||||
|
let ((_, level, _, _), _) = result?;
|
||||||
|
if level != 0 {
|
||||||
|
iter.del_current()?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generates all the words positions levels (including the level zero).
|
||||||
|
fn compute_positions_levels(
|
||||||
|
rtxn: &heed::RoTxn,
|
||||||
|
db: heed::Database<StrLevelPositionCodec, CboRoaringBitmapCodec>,
|
||||||
|
compression_type: CompressionType,
|
||||||
|
compression_level: Option<u32>,
|
||||||
|
shrink_size: Option<u64>,
|
||||||
|
level_group_size: NonZeroUsize,
|
||||||
|
min_level_size: NonZeroUsize,
|
||||||
|
) -> anyhow::Result<Reader<FileFuse>>
|
||||||
|
{
|
||||||
|
// let first_level_size = db.prefix_iter(rtxn, &[field_id])?
|
||||||
|
// .remap_types::<DecodeIgnore, DecodeIgnore>()
|
||||||
|
// .fold(Ok(0usize), |count, result| result.and(count).map(|c| c + 1))?;
|
||||||
|
|
||||||
|
// // It is forbidden to keep a cursor and write in a database at the same time with LMDB
|
||||||
|
// // therefore we write the facet levels entries into a grenad file before transfering them.
|
||||||
|
// let mut writer = tempfile::tempfile().and_then(|file| {
|
||||||
|
// create_writer(compression_type, compression_level, file)
|
||||||
|
// })?;
|
||||||
|
|
||||||
|
// let level_0_range = {
|
||||||
|
// let left = (field_id, 0, T::min_value(), T::min_value());
|
||||||
|
// let right = (field_id, 0, T::max_value(), T::max_value());
|
||||||
|
// left..=right
|
||||||
|
// };
|
||||||
|
|
||||||
|
// // Groups sizes are always a power of the original level_group_size and therefore a group
|
||||||
|
// // always maps groups of the previous level and never splits previous levels groups in half.
|
||||||
|
// let group_size_iter = (1u8..)
|
||||||
|
// .map(|l| (l, level_group_size.get().pow(l as u32)))
|
||||||
|
// .take_while(|(_, s)| first_level_size / *s >= min_level_size.get());
|
||||||
|
|
||||||
|
// for (level, group_size) in group_size_iter {
|
||||||
|
// let mut left = T::zero();
|
||||||
|
// let mut right = T::zero();
|
||||||
|
// let mut group_docids = RoaringBitmap::new();
|
||||||
|
|
||||||
|
// let db = db.remap_key_type::<KC>();
|
||||||
|
// for (i, result) in db.range(rtxn, &level_0_range)?.enumerate() {
|
||||||
|
// let ((_field_id, _level, value, _right), docids) = result?;
|
||||||
|
|
||||||
|
// if i == 0 {
|
||||||
|
// left = value;
|
||||||
|
// } else if i % group_size == 0 {
|
||||||
|
// // we found the first bound of the next group, we must store the left
|
||||||
|
// // and right bounds associated with the docids.
|
||||||
|
// write_entry::<T, KC>(&mut writer, field_id, level, left, right, &group_docids)?;
|
||||||
|
|
||||||
|
// // We save the left bound for the new group and also reset the docids.
|
||||||
|
// group_docids = RoaringBitmap::new();
|
||||||
|
// left = value;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // The right bound is always the bound we run through.
|
||||||
|
// group_docids.union_with(&docids);
|
||||||
|
// right = value;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if !group_docids.is_empty() {
|
||||||
|
// write_entry::<T, KC>(&mut writer, field_id, level, left, right, &group_docids)?;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// writer_into_reader(writer, shrink_size)
|
||||||
|
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_entry<T, KC>(
|
||||||
|
writer: &mut Writer<File>,
|
||||||
|
field_id: u8,
|
||||||
|
level: u8,
|
||||||
|
left: T,
|
||||||
|
right: T,
|
||||||
|
ids: &RoaringBitmap,
|
||||||
|
) -> anyhow::Result<()>
|
||||||
|
where
|
||||||
|
KC: for<'x> heed::BytesEncode<'x, EItem = (u8, u8, T, T)>,
|
||||||
|
{
|
||||||
|
let key = (field_id, level, left, right);
|
||||||
|
let key = KC::bytes_encode(&key).ok_or(Error::Encoding)?;
|
||||||
|
let data = CboRoaringBitmapCodec::bytes_encode(&ids).ok_or(Error::Encoding)?;
|
||||||
|
writer.insert(&key, &data)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user