diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 5cd54a6bc..dd0ab417e 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -1,4 +1,5 @@ use std::marker::PhantomData; +use std::sync::atomic::Ordering; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use heed::types::Bytes; @@ -177,22 +178,22 @@ impl IntoIterator for WriterReceiver { pub struct MergerSender { sender: Sender, /// The number of message we send in total in the channel. - send_count: std::cell::Cell, + send_count: std::sync::atomic::AtomicUsize, /// The number of times we sent something in a channel that was full. - writer_contentious_count: std::cell::Cell, + writer_contentious_count: std::sync::atomic::AtomicUsize, /// The number of times we sent something in a channel that was empty. - merger_contentious_count: std::cell::Cell, + merger_contentious_count: std::sync::atomic::AtomicUsize, } impl Drop for MergerSender { fn drop(&mut self) { eprintln!( "Merger channel stats: {} sends, {} writer contentions ({}%), {} merger contentions ({}%)", - self.send_count.get(), - self.writer_contentious_count.get(), - (self.writer_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0, - self.merger_contentious_count.get(), - (self.merger_contentious_count.get() as f32 / self.send_count.get() as f32) * 100.0 + self.send_count.load(Ordering::SeqCst), + self.writer_contentious_count.load(Ordering::SeqCst), + (self.writer_contentious_count.load(Ordering::SeqCst) as f32 / self.send_count.load(Ordering::SeqCst) as f32) * 100.0, + self.merger_contentious_count.load(Ordering::SeqCst), + (self.merger_contentious_count.load(Ordering::SeqCst) as f32 / self.send_count.load(Ordering::SeqCst) as f32) * 100.0 ) } } @@ -227,12 +228,12 @@ impl MergerSender { fn send(&self, op: WriterOperation) -> StdResult<(), SendError<()>> { if self.sender.is_full() { - self.writer_contentious_count.set(self.writer_contentious_count.get() + 1); + self.writer_contentious_count.fetch_add(1, Ordering::SeqCst); } if self.sender.is_empty() { - self.merger_contentious_count.set(self.merger_contentious_count.get() + 1); + self.merger_contentious_count.fetch_add(1, Ordering::SeqCst); } - self.send_count.set(self.send_count.get() + 1); + self.send_count.fetch_add(1, Ordering::SeqCst); match self.sender.send(op) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index b317aefca..6c002b74d 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -14,7 +14,7 @@ use super::channel::*; use super::document_change::DocumentChange; use super::extract::*; use super::merger::merge_grenad_entries; -use super::{StdResult, TopLevelMap}; +use super::{ItemsPool, StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; use crate::update::GrenadParameters; @@ -213,11 +213,12 @@ where let span = tracing::trace_span!(target: "indexing::documents", parent: ¤t_span, "merge"); let _entered = span.enter(); - let rtxn = index.read_txn().unwrap(); + + let rtxn_pool = ItemsPool::new(|| index.read_txn().map_err(Into::into)); merge_grenad_entries( merger_receiver, merger_sender, - &rtxn, + &rtxn_pool, index, global_fields_ids_map_clone, ) diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 03f7baffd..eb2ecae0a 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -1,33 +1,34 @@ -use std::fs::File; use std::io::{self}; use bincode::ErrorKind; -use grenad::Merger; use heed::types::Bytes; use heed::{Database, RoTxn}; +use rayon::iter::{ParallelBridge, ParallelIterator as _}; use roaring::RoaringBitmap; use super::channel::*; use super::extract::{FacetKind, HashMapMerger}; -use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update}; +use super::{Deletion, DocumentChange, Insertion, ItemsPool, KvReaderFieldId, Update}; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; use crate::update::new::word_fst_builder::WordFstBuilder; -use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, Result}; /// TODO We must return some infos/stats #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] -pub fn merge_grenad_entries( +pub fn merge_grenad_entries<'t>( receiver: MergerReceiver, sender: MergerSender, - rtxn: &RoTxn, + rtxn_pool: &ItemsPool Result> + Send + Sync, RoTxn<'t>, Error>, index: &Index, mut global_fields_ids_map: GlobalFieldsIdsMap<'_>, ) -> Result<()> { let mut buffer: Vec = Vec::new(); - let mut documents_ids = index.documents_ids(rtxn)?; - let mut geo_extractor = GeoExtractor::new(rtxn, index)?; + let (mut documents_ids, mut geo_extractor) = rtxn_pool.with(|rtxn| { + let documents_ids = index.documents_ids(rtxn)?; + let geo_extractor = GeoExtractor::new(rtxn, index)?; + Ok((documents_ids, geo_extractor)) + })?; for merger_operation in receiver { match merger_operation { @@ -39,10 +40,10 @@ pub fn merge_grenad_entries( merger, /// TODO do a MergerOperation::database(&Index) -> Database. index.exact_word_docids.remap_types(), - rtxn, + rtxn_pool, &mut buffer, sender.docids::(), - |_, _key| Ok(()), + // |_, _key| Ok(()), )?; } MergerOperation::FidWordCountDocidsMerger(merger) => { @@ -51,15 +52,15 @@ pub fn merge_grenad_entries( merge_and_send_docids( merger, index.field_id_word_count_docids.remap_types(), - rtxn, + rtxn_pool, &mut buffer, sender.docids::(), - |_, _key| Ok(()), + // |_, _key| Ok(()), )?; } MergerOperation::WordDocidsMerger(merger) => { - let words_fst = index.words_fst(rtxn)?; - let mut word_fst_builder = WordFstBuilder::new(&words_fst, 4)?; + // let words_fst = index.words_fst(rtxn)?; + // let mut word_fst_builder = WordFstBuilder::new(&words_fst, 4)?; { let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); @@ -68,10 +69,10 @@ pub fn merge_grenad_entries( merge_and_send_docids( merger, index.word_docids.remap_types(), - rtxn, + rtxn_pool, &mut buffer, sender.docids::(), - |deladd, key| word_fst_builder.register_word(deladd, key), + // |deladd, key| word_fst_builder.register_word(deladd, key), )?; } @@ -80,8 +81,8 @@ pub fn merge_grenad_entries( tracing::trace_span!(target: "indexing::documents::merge", "words_fst"); let _entered = span.enter(); - let (word_fst_mmap, prefix_fst_mmap) = word_fst_builder.build()?; - sender.main().write_words_fst(word_fst_mmap).unwrap(); + // let (word_fst_mmap, prefix_fst_mmap) = word_fst_builder.build()?; + // sender.main().write_words_fst(word_fst_mmap).unwrap(); } } MergerOperation::WordFidDocidsMerger(merger) => { @@ -91,10 +92,10 @@ pub fn merge_grenad_entries( merge_and_send_docids( merger, index.word_fid_docids.remap_types(), - rtxn, + rtxn_pool, &mut buffer, sender.docids::(), - |_, _key| Ok(()), + // |_, _key| Ok(()), )?; } MergerOperation::WordPairProximityDocidsMerger(merger) => { @@ -103,10 +104,10 @@ pub fn merge_grenad_entries( merge_and_send_docids( merger, index.word_pair_proximity_docids.remap_types(), - rtxn, + rtxn_pool, &mut buffer, sender.docids::(), - |_, _key| Ok(()), + // |_, _key| Ok(()), )?; } MergerOperation::WordPositionDocidsMerger(merger) => { @@ -115,10 +116,10 @@ pub fn merge_grenad_entries( merge_and_send_docids( merger, index.word_position_docids.remap_types(), - rtxn, + rtxn_pool, &mut buffer, sender.docids::(), - |_, _key| Ok(()), + // |_, _key| Ok(()), )?; } MergerOperation::InsertDocument { docid, document } => { @@ -129,15 +130,21 @@ pub fn merge_grenad_entries( sender.documents().uncompressed(docid, &document).unwrap(); if let Some(geo_extractor) = geo_extractor.as_mut() { - let current = index.documents.remap_data_type::().get(rtxn, &docid)?; - let current: Option<&KvReaderFieldId> = current.map(Into::into); - let change = match current { - Some(current) => { - DocumentChange::Update(Update::create(docid, current.boxed(), document)) - } - None => DocumentChange::Insertion(Insertion::create(docid, document)), - }; - geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; + rtxn_pool.with(|rtxn| { + let current = + index.documents.remap_data_type::().get(rtxn, &docid)?; + let current: Option<&KvReaderFieldId> = current.map(Into::into); + let change = match current { + Some(current) => DocumentChange::Update(Update::create( + docid, + current.boxed(), + document, + )), + None => DocumentChange::Insertion(Insertion::create(docid, document)), + }; + geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; + Ok(()) + })?; } } MergerOperation::DeleteDocument { docid } => { @@ -150,9 +157,13 @@ pub fn merge_grenad_entries( sender.documents().delete(docid).unwrap(); if let Some(geo_extractor) = geo_extractor.as_mut() { - let current = index.document(rtxn, docid)?; - let change = DocumentChange::Deletion(Deletion::create(docid, current.boxed())); - geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; + rtxn_pool.with(|rtxn| { + let current = index.document(rtxn, docid)?; + let change = + DocumentChange::Deletion(Deletion::create(docid, current.boxed())); + geo_extractor.manage_change(&mut global_fields_ids_map, &change)?; + Ok(()) + })?; } } MergerOperation::FinishedDocument => { @@ -165,7 +176,7 @@ pub fn merge_grenad_entries( merge_and_send_facet_docids( merger, FacetDatabases::new(index), - rtxn, + rtxn_pool, &mut buffer, sender.facet_docids(), )?; @@ -237,56 +248,60 @@ impl GeoExtractor { } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -fn merge_and_send_docids( +fn merge_and_send_docids<'t>( merger: HashMapMerger, database: Database, - rtxn: &RoTxn<'_>, + rtxn_pool: &ItemsPool Result> + Send + Sync, RoTxn<'t>, Error>, buffer: &mut Vec, - docids_sender: impl DocidsSender, - mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()>, + docids_sender: impl DocidsSender + Sync, + // mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()> + Send + Sync, ) -> Result<()> { - for (key, deladd) in merger.into_iter() { - let current = database.get(rtxn, &key)?; - match merge_cbo_bitmaps(current, deladd.del, deladd.add)? { - Operation::Write(bitmap) => { - let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); - docids_sender.write(&key, value).unwrap(); - register_key(DelAdd::Addition, &key)?; + merger.into_iter().par_bridge().try_for_each(|(key, deladd)| { + rtxn_pool.with(|rtxn| { + let mut buffer = Vec::new(); + let current = database.get(rtxn, &key)?; + match merge_cbo_bitmaps(current, deladd.del, deladd.add)? { + Operation::Write(bitmap) => { + let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); + docids_sender.write(&key, value).unwrap(); + // register_key(DelAdd::Addition, &key)?; + } + Operation::Delete => { + docids_sender.delete(&key).unwrap(); + // register_key(DelAdd::Deletion, &key)?; + } + Operation::Ignore => (), } - Operation::Delete => { - docids_sender.delete(&key).unwrap(); - register_key(DelAdd::Deletion, &key)?; - } - Operation::Ignore => (), - } - } - - Ok(()) + Ok(()) + }) + }) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -fn merge_and_send_facet_docids( +fn merge_and_send_facet_docids<'t>( merger: HashMapMerger, database: FacetDatabases, - rtxn: &RoTxn<'_>, + rtxn_pool: &ItemsPool Result> + Send + Sync, RoTxn<'t>, Error>, buffer: &mut Vec, - docids_sender: impl DocidsSender, + docids_sender: impl DocidsSender + Sync, ) -> Result<()> { - for (key, deladd) in merger.into_iter() { - let current = database.get(rtxn, &key)?; - match merge_cbo_bitmaps(current, deladd.del, deladd.add)? { - Operation::Write(bitmap) => { - let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer); - docids_sender.write(&key, value).unwrap(); + merger.into_iter().par_bridge().try_for_each(|(key, deladd)| { + rtxn_pool.with(|rtxn| { + let mut buffer = Vec::new(); + let current = database.get(rtxn, &key)?; + match merge_cbo_bitmaps(current, deladd.del, deladd.add)? { + Operation::Write(bitmap) => { + let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); + docids_sender.write(&key, value).unwrap(); + } + Operation::Delete => { + docids_sender.delete(&key).unwrap(); + } + Operation::Ignore => (), } - Operation::Delete => { - docids_sender.delete(&key).unwrap(); - } - Operation::Ignore => (), - } - } - - Ok(()) + Ok(()) + }) + }) } struct FacetDatabases {