mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-02-03 17:13:30 +01:00
Move the compression extractor into a dedicated module
This commit is contained in:
parent
b7a9dbfdf8
commit
742709450d
185
crates/milli/src/update/new/extract/documents/compression.rs
Normal file
185
crates/milli/src/update/new/extract/documents/compression.rs
Normal file
@ -0,0 +1,185 @@
|
|||||||
|
use std::cell::RefCell;
|
||||||
|
use std::sync::atomic::{self, AtomicUsize};
|
||||||
|
|
||||||
|
use bumpalo::Bump;
|
||||||
|
use heed::RwTxn;
|
||||||
|
use zstd::dict::{from_continuous, EncoderDictionary};
|
||||||
|
|
||||||
|
use crate::update::new::document::Document as _;
|
||||||
|
use crate::update::new::indexer::document_changes::{
|
||||||
|
DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||||
|
};
|
||||||
|
use crate::update::new::indexer::extract;
|
||||||
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::steps::Step;
|
||||||
|
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||||
|
use crate::update::new::DocumentChange;
|
||||||
|
use crate::{Index, Result};
|
||||||
|
|
||||||
|
/// The compression level to use when compressing documents.
|
||||||
|
const COMPRESSION_LEVEL: i32 = 19;
|
||||||
|
/// The number of documents required as a sample for generating
|
||||||
|
/// the compression dictionary.
|
||||||
|
const SAMPLE_SIZE: usize = 10_000;
|
||||||
|
/// The maximum size the document compression dictionary can be.
|
||||||
|
const DICTIONARY_MAX_SIZE: usize = 64_000;
|
||||||
|
/// The maximum number of documents we accept to compress if they
|
||||||
|
/// have not already been compressed in the database. If this threshold
|
||||||
|
/// is reached, we do not generate a dictionary and continue as is.
|
||||||
|
const COMPRESS_LIMIT: usize = 5_000_000;
|
||||||
|
|
||||||
|
/// A function dedicated to use the existing or generate an appropriate
|
||||||
|
/// document compression dictionay based on the documents available in
|
||||||
|
/// the database and the ones in the payload.
|
||||||
|
///
|
||||||
|
/// If it has to compute a new compression dictionary it immediately
|
||||||
|
/// writes the dictionary in the database and compresses the documents
|
||||||
|
/// that are not part of the current update with it.
|
||||||
|
///
|
||||||
|
/// If there are too many documents already in the database and no
|
||||||
|
/// compression dictionary we prefer not to generate a dictionary to avoid
|
||||||
|
/// compressing all of the documents and potentially blow up disk space.
|
||||||
|
pub fn retrieve_or_compute_document_compression_dictionary<'pl, 'extractor, DC, MSP, SP>(
|
||||||
|
index: &Index,
|
||||||
|
wtxn: &mut RwTxn<'_>,
|
||||||
|
document_changes: &DC,
|
||||||
|
indexing_context: IndexingContext<MSP, SP>,
|
||||||
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
|
) -> Result<Option<EncoderDictionary<'static>>>
|
||||||
|
where
|
||||||
|
DC: DocumentChanges<'pl>,
|
||||||
|
MSP: Fn() -> bool + Sync,
|
||||||
|
SP: Fn(Progress) + Sync,
|
||||||
|
{
|
||||||
|
let number_of_documents = index.number_of_documents(wtxn)? as usize;
|
||||||
|
match index.document_compression_raw_dictionary(wtxn)? {
|
||||||
|
Some(dict) => Ok(Some(EncoderDictionary::copy(dict, COMPRESSION_LEVEL))),
|
||||||
|
None if number_of_documents >= COMPRESS_LIMIT => Ok(None),
|
||||||
|
None if number_of_documents + document_changes.len() < SAMPLE_SIZE => Ok(None),
|
||||||
|
None => {
|
||||||
|
let mut sample_data = Vec::new();
|
||||||
|
let mut sample_sizes = Vec::new();
|
||||||
|
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
|
let extractor = CompressorExtractor {
|
||||||
|
total_documents_to_extract: SAMPLE_SIZE,
|
||||||
|
extracted_documents_count: AtomicUsize::new(0),
|
||||||
|
};
|
||||||
|
|
||||||
|
// We first collect all the documents for the database into a buffer.
|
||||||
|
for result in index.all_compressed_documents(wtxn)? {
|
||||||
|
let (_docid, compressed_document) = result?;
|
||||||
|
// The documents are not compressed with any dictionary at this point.
|
||||||
|
let document = compressed_document.as_non_compressed();
|
||||||
|
sample_data.extend_from_slice(document.as_bytes());
|
||||||
|
sample_sizes.push(document.as_bytes().len());
|
||||||
|
}
|
||||||
|
|
||||||
|
// This extraction only takes care about documents replacements
|
||||||
|
// and not updates (merges). The merged documents are ignored as
|
||||||
|
// we will only use the previous version of them in the database,
|
||||||
|
// just above.
|
||||||
|
extract(
|
||||||
|
document_changes,
|
||||||
|
&extractor,
|
||||||
|
indexing_context,
|
||||||
|
extractor_allocs,
|
||||||
|
&datastore,
|
||||||
|
Step::PreparingCompressionDictionary,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
for data in datastore {
|
||||||
|
let CompressorExtractorData { fields, fields_count, .. } = data.into_inner();
|
||||||
|
let mut fields_iter = fields.into_iter();
|
||||||
|
for field_count in fields_count {
|
||||||
|
let mut document_fields_size = 0;
|
||||||
|
for field in fields_iter.by_ref().take(field_count) {
|
||||||
|
sample_data.extend_from_slice(field);
|
||||||
|
document_fields_size += field.len();
|
||||||
|
}
|
||||||
|
sample_sizes.push(document_fields_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
debug_assert_eq!(
|
||||||
|
fields_iter.count(),
|
||||||
|
0,
|
||||||
|
"We must have consumed all the documents' \
|
||||||
|
fields but there were some remaining ones"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let dictionary = from_continuous(&sample_data, &sample_sizes, DICTIONARY_MAX_SIZE)?;
|
||||||
|
index.put_document_compression_dictionary(wtxn, &dictionary)?;
|
||||||
|
|
||||||
|
todo!("compress (in parallel) all the database documents that are not impacted by the current update");
|
||||||
|
|
||||||
|
Ok(Some(EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CompressorExtractor {
|
||||||
|
/// The total number of documents we must extract from all threads.
|
||||||
|
total_documents_to_extract: usize,
|
||||||
|
/// The combined, shared, number of extracted documents.
|
||||||
|
extracted_documents_count: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct CompressorExtractorData<'extractor> {
|
||||||
|
/// The field content in JSON but as bytes.
|
||||||
|
fields: Vec<&'extractor [u8]>,
|
||||||
|
/// The number of fields associated to single documents.
|
||||||
|
/// It is used to provide good sample to the dictionary generator.
|
||||||
|
fields_count: Vec<usize>,
|
||||||
|
/// We extracted the expected count of documents, we can skip everything now.
|
||||||
|
must_stop: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl<'extractor> MostlySend for RefCell<CompressorExtractorData<'extractor>> {}
|
||||||
|
|
||||||
|
impl<'extractor> Extractor<'extractor> for CompressorExtractor {
|
||||||
|
type Data = RefCell<CompressorExtractorData<'extractor>>;
|
||||||
|
|
||||||
|
fn init_data<'doc>(
|
||||||
|
&'doc self,
|
||||||
|
_extractor_alloc: &'extractor bumpalo::Bump,
|
||||||
|
) -> crate::Result<Self::Data> {
|
||||||
|
Ok(RefCell::new(CompressorExtractorData::default()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process<'doc>(
|
||||||
|
&'doc self,
|
||||||
|
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
|
||||||
|
context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>,
|
||||||
|
) -> crate::Result<()> {
|
||||||
|
let mut data = context.data.borrow_mut_or_yield();
|
||||||
|
|
||||||
|
for change in changes {
|
||||||
|
if data.must_stop {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let change = change?;
|
||||||
|
match change {
|
||||||
|
DocumentChange::Deletion(_) => (),
|
||||||
|
DocumentChange::Update(_) => (),
|
||||||
|
DocumentChange::Insertion(insertion) => {
|
||||||
|
let mut fields_count = 0;
|
||||||
|
for result in insertion.inserted().iter_top_level_fields() {
|
||||||
|
let (_field_name, raw_value) = result?;
|
||||||
|
let bytes = raw_value.get().as_bytes();
|
||||||
|
data.fields.push(context.extractor_alloc.alloc_slice_copy(bytes));
|
||||||
|
fields_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
let previous_count =
|
||||||
|
self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst);
|
||||||
|
data.must_stop = previous_count >= self.total_documents_to_extract;
|
||||||
|
data.fields_count.push(fields_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,7 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
|
|
||||||
use bumpalo::Bump;
|
use bumpalo::Bump;
|
||||||
|
pub use compression::retrieve_or_compute_document_compression_dictionary;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
|
|
||||||
use super::DelAddRoaringBitmap;
|
use super::DelAddRoaringBitmap;
|
||||||
@ -13,6 +14,8 @@ use crate::update::new::DocumentChange;
|
|||||||
use crate::vector::EmbeddingConfigs;
|
use crate::vector::EmbeddingConfigs;
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
|
mod compression;
|
||||||
|
|
||||||
pub struct DocumentsExtractor<'a, 'b> {
|
pub struct DocumentsExtractor<'a, 'b> {
|
||||||
document_sender: DocumentsSender<'a, 'b>,
|
document_sender: DocumentsSender<'a, 'b>,
|
||||||
embedders: &'a EmbeddingConfigs,
|
embedders: &'a EmbeddingConfigs,
|
||||||
@ -50,7 +53,6 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
|
|||||||
// **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop
|
// **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop
|
||||||
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
|
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
|
||||||
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
|
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
|
||||||
|
|
||||||
let external_docid = change.external_docid().to_owned();
|
let external_docid = change.external_docid().to_owned();
|
||||||
|
|
||||||
todo!("manage documents compression");
|
todo!("manage documents compression");
|
@ -6,9 +6,7 @@ mod searchable;
|
|||||||
mod vectors;
|
mod vectors;
|
||||||
|
|
||||||
use bumpalo::Bump;
|
use bumpalo::Bump;
|
||||||
pub use cache::{
|
pub use cache::*;
|
||||||
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
|
|
||||||
};
|
|
||||||
pub use documents::*;
|
pub use documents::*;
|
||||||
pub use faceted::*;
|
pub use faceted::*;
|
||||||
pub use geo::*;
|
pub use geo::*;
|
||||||
|
@ -1,7 +1,3 @@
|
|||||||
mod extract_word_docids;
|
|
||||||
mod extract_word_pair_proximity_docids;
|
|
||||||
mod tokenize_document;
|
|
||||||
|
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
@ -22,6 +18,10 @@ use crate::update::new::DocumentChange;
|
|||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
||||||
|
|
||||||
|
mod extract_word_docids;
|
||||||
|
mod extract_word_pair_proximity_docids;
|
||||||
|
mod tokenize_document;
|
||||||
|
|
||||||
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
|
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
|
||||||
tokenizer: &'a DocumentTokenizer<'a>,
|
tokenizer: &'a DocumentTokenizer<'a>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: GrenadParameters,
|
||||||
|
@ -93,6 +93,7 @@ mod test {
|
|||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
|
|
||||||
use bumpalo::Bump;
|
use bumpalo::Bump;
|
||||||
|
use zstd::dict::DecoderDictionary;
|
||||||
|
|
||||||
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
||||||
use crate::index::tests::TempIndex;
|
use crate::index::tests::TempIndex;
|
||||||
@ -144,7 +145,6 @@ mod test {
|
|||||||
let indexer = Bump::new();
|
let indexer = Bump::new();
|
||||||
|
|
||||||
let index = TempIndex::new();
|
let index = TempIndex::new();
|
||||||
|
|
||||||
let rtxn = index.read_txn().unwrap();
|
let rtxn = index.read_txn().unwrap();
|
||||||
|
|
||||||
let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
||||||
@ -152,8 +152,13 @@ mod test {
|
|||||||
let fields_ids_map =
|
let fields_ids_map =
|
||||||
RwLock::new(FieldIdMapWithMetadata::new(db_fields_ids_map.clone(), metadata_builder));
|
RwLock::new(FieldIdMapWithMetadata::new(db_fields_ids_map.clone(), metadata_builder));
|
||||||
|
|
||||||
let fields_ids_map_store = ThreadLocal::new();
|
let db_document_decompression_dictionary =
|
||||||
|
match index.document_compression_raw_dictionary(&rtxn).unwrap() {
|
||||||
|
Some(dictionary) => Some(zstd::dict::DecoderDictionary::copy(dictionary)),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let fields_ids_map_store = ThreadLocal::new();
|
||||||
let mut extractor_allocs = ThreadLocal::new();
|
let mut extractor_allocs = ThreadLocal::new();
|
||||||
let doc_allocs = ThreadLocal::new();
|
let doc_allocs = ThreadLocal::new();
|
||||||
|
|
||||||
@ -165,6 +170,7 @@ mod test {
|
|||||||
let context = IndexingContext {
|
let context = IndexingContext {
|
||||||
index: &index,
|
index: &index,
|
||||||
db_fields_ids_map: &db_fields_ids_map,
|
db_fields_ids_map: &db_fields_ids_map,
|
||||||
|
db_document_decompression_dictionary: db_document_decompression_dictionary.as_ref(),
|
||||||
new_fields_ids_map: &fields_ids_map,
|
new_fields_ids_map: &fields_ids_map,
|
||||||
doc_allocs: &doc_allocs,
|
doc_allocs: &doc_allocs,
|
||||||
fields_ids_map_store: &fields_ids_map_store,
|
fields_ids_map_store: &fields_ids_map_store,
|
||||||
|
@ -1,15 +1,11 @@
|
|||||||
use std::cell::RefCell;
|
|
||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::sync::atomic::{self, AtomicBool, AtomicUsize};
|
use std::sync::atomic::{self, AtomicBool};
|
||||||
use std::sync::{OnceLock, RwLock};
|
use std::sync::{OnceLock, RwLock};
|
||||||
use std::thread::{self, Builder};
|
use std::thread::{self, Builder};
|
||||||
|
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
use bumpalo::Bump;
|
|
||||||
use bumparaw_collections::RawMap;
|
use bumparaw_collections::RawMap;
|
||||||
use document_changes::{
|
pub use document_changes::{extract, DocumentChanges, IndexingContext, Progress};
|
||||||
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
|
||||||
};
|
|
||||||
pub use document_deletion::DocumentDeletion;
|
pub use document_deletion::DocumentDeletion;
|
||||||
pub use document_operation::{DocumentOperation, PayloadStats};
|
pub use document_operation::{DocumentOperation, PayloadStats};
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
@ -21,20 +17,19 @@ use rand::SeedableRng as _;
|
|||||||
use rustc_hash::FxBuildHasher;
|
use rustc_hash::FxBuildHasher;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
pub use update_by_function::UpdateByFunction;
|
pub use update_by_function::UpdateByFunction;
|
||||||
use zstd::dict::{DecoderDictionary, EncoderDictionary};
|
use zstd::dict::DecoderDictionary;
|
||||||
|
|
||||||
use super::channel::*;
|
use super::channel::*;
|
||||||
use super::document::Document as _;
|
|
||||||
use super::extract::*;
|
use super::extract::*;
|
||||||
use super::facet_search_builder::FacetSearchBuilder;
|
use super::facet_search_builder::FacetSearchBuilder;
|
||||||
use super::merger::FacetFieldIdsDelta;
|
use super::merger::FacetFieldIdsDelta;
|
||||||
use super::steps::IndexingStep;
|
use super::steps::IndexingStep;
|
||||||
use super::thread_local::{FullySend, MostlySend, ThreadLocal};
|
use super::thread_local::ThreadLocal;
|
||||||
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
||||||
use super::words_prefix_docids::{
|
use super::words_prefix_docids::{
|
||||||
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
|
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
|
||||||
};
|
};
|
||||||
use super::{DocumentChange, StdResult};
|
use super::StdResult;
|
||||||
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
|
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
|
||||||
use crate::facet::FacetType;
|
use crate::facet::FacetType;
|
||||||
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
||||||
@ -44,7 +39,6 @@ use crate::proximity::ProximityPrecision;
|
|||||||
use crate::update::del_add::DelAdd;
|
use crate::update::del_add::DelAdd;
|
||||||
use crate::update::new::extract::EmbeddingExtractor;
|
use crate::update::new::extract::EmbeddingExtractor;
|
||||||
use crate::update::new::merger::merge_and_send_rtree;
|
use crate::update::new::merger::merge_and_send_rtree;
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
|
||||||
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
|
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
|
||||||
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
||||||
use crate::update::settings::InnerIndexSettings;
|
use crate::update::settings::InnerIndexSettings;
|
||||||
@ -141,10 +135,9 @@ where
|
|||||||
|
|
||||||
let document_compression_dictionary = pool
|
let document_compression_dictionary = pool
|
||||||
.install(|| {
|
.install(|| {
|
||||||
let rtxn = index.read_txn()?;
|
retrieve_or_compute_document_compression_dictionary(
|
||||||
compute_document_compression_dictionary(
|
|
||||||
index,
|
index,
|
||||||
&rtxn,
|
wtxn,
|
||||||
document_changes,
|
document_changes,
|
||||||
indexing_context,
|
indexing_context,
|
||||||
&mut extractor_allocs,
|
&mut extractor_allocs,
|
||||||
@ -574,141 +567,6 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The compression level to use when compressing documents.
|
|
||||||
const DOCUMENT_COMPRESSION_LEVEL: i32 = 19;
|
|
||||||
/// The sample size used to generate the document compression dictionary.
|
|
||||||
const DOCUMENT_COMPRESSION_SAMPLE_SIZE: usize = 10_000;
|
|
||||||
/// The maximum size the document compression dictionary can be.
|
|
||||||
const DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE: usize = 64_000;
|
|
||||||
/// The maximum number of documents we accept to compress if they
|
|
||||||
/// weren't already compressed in the database. If this threshold
|
|
||||||
/// is reached we do not generate a dictionary and continue as is.
|
|
||||||
const DOCUMENT_COMPRESSION_COMPRESS_LIMIT: u64 = 5_000_000;
|
|
||||||
|
|
||||||
/// A function dedicated to use the existing or generate an appropriate
|
|
||||||
/// document compression dictionay based on the documents available in
|
|
||||||
/// the database and the ones in the payload.
|
|
||||||
///
|
|
||||||
/// If there are too many documents already in the database and no
|
|
||||||
/// compression dictionary we prefer not to generate a dictionary to avoid
|
|
||||||
/// compressing all of the documents and potentially blow up disk space.
|
|
||||||
fn compute_document_compression_dictionary<'pl, 'extractor, DC, MSP, SP>(
|
|
||||||
index: &Index,
|
|
||||||
rtxn: &RoTxn<'_>,
|
|
||||||
document_changes: &DC,
|
|
||||||
indexing_context: IndexingContext<MSP, SP>,
|
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
|
||||||
) -> Result<Option<EncoderDictionary<'static>>>
|
|
||||||
where
|
|
||||||
DC: DocumentChanges<'pl>,
|
|
||||||
MSP: Fn() -> bool + Sync,
|
|
||||||
SP: Fn(Progress) + Sync,
|
|
||||||
{
|
|
||||||
match index.document_compression_raw_dictionary(rtxn)? {
|
|
||||||
Some(dict) => Ok(Some(EncoderDictionary::copy(dict, DOCUMENT_COMPRESSION_LEVEL))),
|
|
||||||
None if index.number_of_documents(rtxn)? >= DOCUMENT_COMPRESSION_COMPRESS_LIMIT => Ok(None),
|
|
||||||
None => {
|
|
||||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
|
||||||
let extractor = CompressorExtractor {
|
|
||||||
total_documents_to_extract: DOCUMENT_COMPRESSION_SAMPLE_SIZE,
|
|
||||||
extracted_documents_count: AtomicUsize::new(0),
|
|
||||||
};
|
|
||||||
|
|
||||||
todo!("collect the documents samples from the database first (or after)");
|
|
||||||
|
|
||||||
// This extraction only takes care about documents replacement
|
|
||||||
// and not update (merges). The merged documents are ignore as
|
|
||||||
// we will only use the previous version of them in the database.
|
|
||||||
extract(
|
|
||||||
document_changes,
|
|
||||||
&extractor,
|
|
||||||
indexing_context,
|
|
||||||
extractor_allocs,
|
|
||||||
&datastore,
|
|
||||||
Step::PreparingCompressionDictionary,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let mut sample_data = Vec::new();
|
|
||||||
let mut sample_sizes = Vec::new();
|
|
||||||
for data in datastore {
|
|
||||||
let CompressorExtractorData { buffer, must_stop: _ } = data.into_inner();
|
|
||||||
let mut subsample_size = 0;
|
|
||||||
for subsample in buffer {
|
|
||||||
sample_data.extend_from_slice(subsample);
|
|
||||||
subsample_size += subsample.len();
|
|
||||||
}
|
|
||||||
sample_sizes.push(subsample_size);
|
|
||||||
}
|
|
||||||
|
|
||||||
let dictionary = zstd::dict::from_continuous(
|
|
||||||
&sample_data,
|
|
||||||
&sample_sizes,
|
|
||||||
DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(Some(EncoderDictionary::copy(&dictionary, DOCUMENT_COMPRESSION_LEVEL)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct CompressorExtractor {
|
|
||||||
total_documents_to_extract: usize,
|
|
||||||
extracted_documents_count: AtomicUsize,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct CompressorExtractorData<'extractor> {
|
|
||||||
buffer: Vec<&'extractor [u8]>,
|
|
||||||
/// We extracted the expected count of documents, we can skip everything now.
|
|
||||||
must_stop: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<'extractor> MostlySend for RefCell<CompressorExtractorData<'extractor>> {}
|
|
||||||
|
|
||||||
impl<'extractor> Extractor<'extractor> for CompressorExtractor {
|
|
||||||
type Data = RefCell<CompressorExtractorData<'extractor>>;
|
|
||||||
|
|
||||||
fn init_data<'doc>(
|
|
||||||
&'doc self,
|
|
||||||
_extractor_alloc: &'extractor bumpalo::Bump,
|
|
||||||
) -> crate::Result<Self::Data> {
|
|
||||||
Ok(RefCell::new(CompressorExtractorData::default()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process<'doc>(
|
|
||||||
&'doc self,
|
|
||||||
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
|
|
||||||
context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>,
|
|
||||||
) -> crate::Result<()> {
|
|
||||||
let mut data = context.data.borrow_mut_or_yield();
|
|
||||||
|
|
||||||
for change in changes {
|
|
||||||
if data.must_stop {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let change = change?;
|
|
||||||
match change {
|
|
||||||
DocumentChange::Deletion(_) => (),
|
|
||||||
DocumentChange::Update(_) => (),
|
|
||||||
DocumentChange::Insertion(insertion) => {
|
|
||||||
for result in insertion.inserted().iter_top_level_fields() {
|
|
||||||
let (_field_name, raw_value) = result?;
|
|
||||||
let bytes = raw_value.get().as_bytes();
|
|
||||||
data.buffer.push(context.extractor_alloc.alloc_slice_copy(bytes));
|
|
||||||
}
|
|
||||||
|
|
||||||
let previous_count =
|
|
||||||
self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst);
|
|
||||||
data.must_stop = previous_count >= self.total_documents_to_extract;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A function dedicated to manage all the available BBQueue frames.
|
/// A function dedicated to manage all the available BBQueue frames.
|
||||||
///
|
///
|
||||||
/// It reads all the available frames, do the corresponding database operations
|
/// It reads all the available frames, do the corresponding database operations
|
||||||
|
Loading…
x
Reference in New Issue
Block a user