WIP add more logs

This commit is contained in:
Clément Renault 2024-09-26 16:37:38 +02:00
parent ac2d54b27c
commit d6b3aae8a6
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 20 additions and 3 deletions

View File

@ -51,6 +51,7 @@ impl IntoIterator for HashMapMerger {
fn into_iter(self) -> Self::IntoIter { fn into_iter(self) -> Self::IntoIter {
let mut entries: Vec<_> = self.maps.into_iter().flat_map(|m| m.into_iter()).collect(); let mut entries: Vec<_> = self.maps.into_iter().flat_map(|m| m.into_iter()).collect();
eprintln!("There are {} entries in the HashMapMerger", entries.len());
entries.par_sort_unstable_by(|(ka, _), (kb, _)| ka.cmp(kb)); entries.par_sort_unstable_by(|(ka, _), (kb, _)| ka.cmp(kb));
IntoIter { IntoIter {
sorted_entries: entries.into_iter(), sorted_entries: entries.into_iter(),

View File

@ -224,6 +224,7 @@ where
) )
})?; })?;
let mut entries_count = 0;
for operation in writer_receiver { for operation in writer_receiver {
let database = operation.database(index); let database = operation.database(index);
match operation.entry() { match operation.entry() {
@ -232,10 +233,15 @@ where
unreachable!("We tried to delete an unknown key") unreachable!("We tried to delete an unknown key")
} }
} }
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?, EntryOperation::Write(e) => {
entries_count += 1;
database.put(wtxn, e.key(), e.value())?
}
} }
} }
eprintln!("We saw {entries_count}");
/// TODO handle the panicking threads /// TODO handle the panicking threads
handle.join().unwrap()?; handle.join().unwrap()?;
handle2.join().unwrap()?; handle2.join().unwrap()?;

View File

@ -248,6 +248,7 @@ impl GeoExtractor {
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
#[inline(never)]
fn merge_and_send_docids<'t>( fn merge_and_send_docids<'t>(
merger: HashMapMerger, merger: HashMapMerger,
database: Database<Bytes, Bytes>, database: Database<Bytes, Bytes>,
@ -256,6 +257,7 @@ fn merge_and_send_docids<'t>(
docids_sender: impl DocidsSender + Sync, docids_sender: impl DocidsSender + Sync,
// mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()> + Send + Sync, // mut register_key: impl FnMut(DelAdd, &[u8]) -> Result<()> + Send + Sync,
) -> Result<()> { ) -> Result<()> {
let now = std::time::Instant::now();
merger.into_iter().par_bridge().try_for_each(|(key, deladd)| { merger.into_iter().par_bridge().try_for_each(|(key, deladd)| {
rtxn_pool.with(|rtxn| { rtxn_pool.with(|rtxn| {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
@ -274,10 +276,15 @@ fn merge_and_send_docids<'t>(
} }
Ok(()) Ok(())
}) })
}) })?;
eprintln!("I took to merger hashmaps {:.2?}", now.elapsed());
Ok(())
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
#[inline(never)]
fn merge_and_send_facet_docids<'t>( fn merge_and_send_facet_docids<'t>(
merger: HashMapMerger, merger: HashMapMerger,
database: FacetDatabases, database: FacetDatabases,
@ -285,6 +292,7 @@ fn merge_and_send_facet_docids<'t>(
buffer: &mut Vec<u8>, buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender + Sync, docids_sender: impl DocidsSender + Sync,
) -> Result<()> { ) -> Result<()> {
let now = std::time::Instant::now();
merger.into_iter().par_bridge().try_for_each(|(key, deladd)| { merger.into_iter().par_bridge().try_for_each(|(key, deladd)| {
rtxn_pool.with(|rtxn| { rtxn_pool.with(|rtxn| {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
@ -301,7 +309,9 @@ fn merge_and_send_facet_docids<'t>(
} }
Ok(()) Ok(())
}) })
}) })?;
eprintln!("I took to merger hashmaps {:.2?}", now.elapsed());
Ok(())
} }
struct FacetDatabases { struct FacetDatabases {