From 6cdae51626e5b231fb85a4b7d9019e219e2e8ea3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 17 Dec 2024 17:33:53 +0100 Subject: [PATCH] Remove last warning by storing rtxn and compressor on each thread --- .../new/extract/documents/compression.rs | 32 +++++++++++++++---- crates/milli/src/update/new/steps.rs | 4 ++- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/crates/milli/src/update/new/extract/documents/compression.rs b/crates/milli/src/update/new/extract/documents/compression.rs index 6ad11ffc1..24bd5ee47 100644 --- a/crates/milli/src/update/new/extract/documents/compression.rs +++ b/crates/milli/src/update/new/extract/documents/compression.rs @@ -2,9 +2,10 @@ use std::cell::RefCell; use std::sync::atomic::{self, AtomicUsize}; use bumpalo::Bump; -use heed::RwTxn; +use heed::{RoTxn, RwTxn}; use rayon::iter::{ParallelBridge, ParallelIterator as _}; use roaring::RoaringBitmap; +use zstd::bulk::Compressor; use zstd::dict::{from_continuous, EncoderDictionary}; use crate::heed_codec::CompressedObkvU16; @@ -116,17 +117,26 @@ where let dictionary = from_continuous(&sample_data, &sample_sizes, DICTIONARY_MAX_SIZE)?; index.put_document_compression_dictionary(wtxn, &dictionary)?; - let dictionary = EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL); + let encoder_dictionary = EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL); let all_documents = index.documents_ids(wtxn)?; let documents_to_compress = all_documents - all_documents_seen; + let datastore = ThreadLocal::with_capacity(rayon::max_num_threads()); let pi = documents_to_compress.into_iter().par_bridge().map(|docid| { - /// TODO put the rtxn and encoder dict in a ThreadLocal datastore - let rtxn = index.read_txn()?; - let compressed_document = index.compressed_document(&rtxn, docid)?.unwrap(); + let data = datastore.get_or_try(|| { + crate::Result::Ok(RefCell::new(ParallelCompressionData { + rtxn: index.read_txn()?, + compressor: Compressor::with_dictionary(COMPRESSION_LEVEL, &dictionary)?, + })) + })?; + + let mut data = data.borrow_mut_or_yield(); + let ParallelCompressionData { rtxn, compressor } = &mut *data; + + let compressed_document = index.compressed_document(rtxn, docid)?.unwrap(); // The documents are not compressed with any dictionary at this point. let document = compressed_document.as_non_compressed(); - let compressed = CompressedObkvU16::with_dictionary(document, &dictionary)?; + let compressed = CompressedObkvU16::with_compressor(document, compressor)?; Ok((docid, compressed)) as crate::Result<_> }); @@ -140,11 +150,19 @@ where Ok(()) as crate::Result<_> })?; - Ok(Some(dictionary)) + Ok(Some(encoder_dictionary)) } } } +/// Used when we are compressing documents in parallel. +struct ParallelCompressionData<'extractor> { + rtxn: RoTxn<'extractor>, + compressor: Compressor<'extractor>, +} + +unsafe impl<'extractor> MostlySend for RefCell> {} + struct CompressorExtractor { /// The total number of documents we must extract from all threads. total_documents_to_extract: usize, diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index 16912b3d5..c988e0dfb 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -27,7 +27,9 @@ impl Step for IndexingStep { fn name(&self) -> Cow<'static, str> { match self { IndexingStep::PreparingPayloads => "preparing update file", - IndexingStep::PreparingCompressionDictionary => "preparing documents compression dictionary", + IndexingStep::PreparingCompressionDictionary => { + "preparing documents compression dictionary" + } IndexingStep::ExtractingDocuments => "extracting documents", IndexingStep::ExtractingFacets => "extracting facets", IndexingStep::ExtractingWords => "extracting words",