Remove last warning by storing rtxn and compressor on each thread

This commit is contained in:
Clément Renault 2024-12-17 17:33:53 +01:00
parent d2e628e3b5
commit 6cdae51626
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 28 additions and 8 deletions

View File

@ -2,9 +2,10 @@ use std::cell::RefCell;
use std::sync::atomic::{self, AtomicUsize}; use std::sync::atomic::{self, AtomicUsize};
use bumpalo::Bump; use bumpalo::Bump;
use heed::RwTxn; use heed::{RoTxn, RwTxn};
use rayon::iter::{ParallelBridge, ParallelIterator as _}; use rayon::iter::{ParallelBridge, ParallelIterator as _};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use zstd::bulk::Compressor;
use zstd::dict::{from_continuous, EncoderDictionary}; use zstd::dict::{from_continuous, EncoderDictionary};
use crate::heed_codec::CompressedObkvU16; use crate::heed_codec::CompressedObkvU16;
@ -116,17 +117,26 @@ where
let dictionary = from_continuous(&sample_data, &sample_sizes, DICTIONARY_MAX_SIZE)?; let dictionary = from_continuous(&sample_data, &sample_sizes, DICTIONARY_MAX_SIZE)?;
index.put_document_compression_dictionary(wtxn, &dictionary)?; index.put_document_compression_dictionary(wtxn, &dictionary)?;
let dictionary = EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL); let encoder_dictionary = EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL);
let all_documents = index.documents_ids(wtxn)?; let all_documents = index.documents_ids(wtxn)?;
let documents_to_compress = all_documents - all_documents_seen; let documents_to_compress = all_documents - all_documents_seen;
let datastore = ThreadLocal::with_capacity(rayon::max_num_threads());
let pi = documents_to_compress.into_iter().par_bridge().map(|docid| { let pi = documents_to_compress.into_iter().par_bridge().map(|docid| {
/// TODO put the rtxn and encoder dict in a ThreadLocal datastore let data = datastore.get_or_try(|| {
let rtxn = index.read_txn()?; crate::Result::Ok(RefCell::new(ParallelCompressionData {
let compressed_document = index.compressed_document(&rtxn, docid)?.unwrap(); rtxn: index.read_txn()?,
compressor: Compressor::with_dictionary(COMPRESSION_LEVEL, &dictionary)?,
}))
})?;
let mut data = data.borrow_mut_or_yield();
let ParallelCompressionData { rtxn, compressor } = &mut *data;
let compressed_document = index.compressed_document(rtxn, docid)?.unwrap();
// The documents are not compressed with any dictionary at this point. // The documents are not compressed with any dictionary at this point.
let document = compressed_document.as_non_compressed(); let document = compressed_document.as_non_compressed();
let compressed = CompressedObkvU16::with_dictionary(document, &dictionary)?; let compressed = CompressedObkvU16::with_compressor(document, compressor)?;
Ok((docid, compressed)) as crate::Result<_> Ok((docid, compressed)) as crate::Result<_>
}); });
@ -140,11 +150,19 @@ where
Ok(()) as crate::Result<_> Ok(()) as crate::Result<_>
})?; })?;
Ok(Some(dictionary)) Ok(Some(encoder_dictionary))
} }
} }
} }
/// Used when we are compressing documents in parallel.
struct ParallelCompressionData<'extractor> {
rtxn: RoTxn<'extractor>,
compressor: Compressor<'extractor>,
}
unsafe impl<'extractor> MostlySend for RefCell<ParallelCompressionData<'extractor>> {}
struct CompressorExtractor { struct CompressorExtractor {
/// The total number of documents we must extract from all threads. /// The total number of documents we must extract from all threads.
total_documents_to_extract: usize, total_documents_to_extract: usize,

View File

@ -27,7 +27,9 @@ impl Step for IndexingStep {
fn name(&self) -> Cow<'static, str> { fn name(&self) -> Cow<'static, str> {
match self { match self {
IndexingStep::PreparingPayloads => "preparing update file", IndexingStep::PreparingPayloads => "preparing update file",
IndexingStep::PreparingCompressionDictionary => "preparing documents compression dictionary", IndexingStep::PreparingCompressionDictionary => {
"preparing documents compression dictionary"
}
IndexingStep::ExtractingDocuments => "extracting documents", IndexingStep::ExtractingDocuments => "extracting documents",
IndexingStep::ExtractingFacets => "extracting facets", IndexingStep::ExtractingFacets => "extracting facets",
IndexingStep::ExtractingWords => "extracting words", IndexingStep::ExtractingWords => "extracting words",