From f17cb2ef5b73ac4f06f5ed85baef62a3aee4432e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 17 Jul 2024 13:55:51 +0200 Subject: [PATCH] Use Redis to measure the Sorter insertions --- Cargo.lock | 32 +++++++++++++++++++ milli/Cargo.toml | 2 ++ .../extract/extract_docid_word_positions.rs | 3 ++ .../extract/extract_fid_word_count_docids.rs | 3 ++ .../extract/extract_word_docids.rs | 8 +++++ .../extract_word_pair_proximity_docids.rs | 6 ++++ .../extract/extract_word_position_docids.rs | 5 +++ .../src/update/index_documents/extract/mod.rs | 3 ++ 8 files changed, 62 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 3b5ae4487..a5ff2f7c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1047,6 +1047,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "concat-arrays" version = "0.1.2" @@ -3552,6 +3562,7 @@ dependencies = [ "rand", "rayon", "rayon-par-bridge", + "redis", "rhai", "roaring", "rstar", @@ -4420,6 +4431,21 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03251193000f4bd3b042892be858ee50e8b3719f2b08e5833ac4353724632430" +[[package]] +name = "redis" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" +dependencies = [ + "combine", + "itoa", + "percent-encoding", + "ryu", + "sha1_smol", + "socket2 0.5.5", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -4891,6 +4917,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sha2" version = "0.10.8" diff --git a/milli/Cargo.toml b/milli/Cargo.toml index e635bbcf4..a9fc84cab 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -67,6 +67,8 @@ filter-parser = { path = "../filter-parser" } # documents words self-join itertools = "0.13.0" +redis = "0.25.4" + csv = "1.3.0" candle-core = { version = "0.6.0" } candle-transformers = { version = "0.6.0" } diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index 721d67e96..96e96701c 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -29,6 +29,8 @@ pub fn extract_docid_word_positions( settings_diff: &InnerIndexSettingsDiff, max_positions_per_attributes: Option, ) -> Result<(grenad::Reader>, ScriptLanguageDocidsMap)> { + let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); + let max_positions_per_attributes = max_positions_per_attributes .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); let max_memory = indexer.max_memory_by_thread(); @@ -148,6 +150,7 @@ pub fn extract_docid_word_positions( for (field_id, value) in obkv.iter() { key_buffer.truncate(mem::size_of::()); key_buffer.extend_from_slice(&field_id.to_be_bytes()); + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(&mut conn).unwrap(); docid_word_positions_sorter.insert(&key_buffer, value)?; } diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index f252df1cd..26ad68b71 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -26,6 +26,7 @@ pub fn extract_fid_word_count_docids( indexer: GrenadParameters, _settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { + let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); let max_memory = indexer.max_memory_by_thread(); let mut fid_word_count_docids_sorter = create_sorter( @@ -70,6 +71,7 @@ pub fn extract_fid_word_count_docids( key_buffer.clear(); key_buffer.extend_from_slice(fid_bytes); key_buffer.push(word_count as u8); + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(&mut conn).unwrap(); fid_word_count_docids_sorter .insert(&key_buffer, value_writer.into_inner().unwrap())?; } @@ -81,6 +83,7 @@ pub fn extract_fid_word_count_docids( key_buffer.clear(); key_buffer.extend_from_slice(fid_bytes); key_buffer.push(word_count as u8); + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(&mut conn).unwrap(); fid_word_count_docids_sorter .insert(&key_buffer, value_writer.into_inner().unwrap())?; } diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index 457d2359e..5574fca62 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -10,6 +10,7 @@ use super::helpers::{ create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, writer_into_reader, GrenadParameters, }; +use super::REDIS_CLIENT; use crate::error::SerializationError; use crate::heed_codec::StrBEU16Codec; use crate::index::db_name::DOCID_WORD_POSITIONS; @@ -37,6 +38,7 @@ pub fn extract_word_docids( grenad::Reader>, )> { let max_memory = indexer.max_memory_by_thread(); + let mut conn = REDIS_CLIENT.get_connection().unwrap(); let mut word_fid_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, @@ -80,6 +82,7 @@ pub fn extract_word_docids( &del_words, &add_words, &mut word_fid_docids_sorter, + &mut conn, )?; del_words.clear(); @@ -164,6 +167,7 @@ fn words_into_sorter( del_words: &BTreeSet>, add_words: &BTreeSet>, word_fid_docids_sorter: &mut grenad::Sorter, + conn: &mut redis::Connection, ) -> Result<()> { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; @@ -192,18 +196,21 @@ fn words_into_sorter( key_buffer.extend_from_slice(word_bytes); key_buffer.push(0); key_buffer.extend_from_slice(&fid.to_be_bytes()); + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; } Ok(()) } +// TODO do we still use this? #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] fn docids_into_writers( word: &str, deletions: &RoaringBitmap, additions: &RoaringBitmap, writer: &mut grenad::Writer, + conn: &mut redis::Connection, ) -> Result<()> where W: std::io::Write, @@ -235,6 +242,7 @@ where } // insert everything in the same writer. + redis::cmd("INCR").arg(word.as_bytes()).query::(conn).unwrap(); writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?; Ok(()) diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 5a9363942..23bad9b2e 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -26,6 +26,8 @@ pub fn extract_word_pair_proximity_docids( indexer: GrenadParameters, settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { + let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); + // early return if the data shouldn't be deleted nor created. if settings_diff.settings_update_only && !settings_diff.reindex_proximities() { let writer = create_writer( @@ -78,6 +80,7 @@ pub fn extract_word_pair_proximity_docids( &del_word_pair_proximity, &add_word_pair_proximity, &mut word_pair_proximity_docids_sorters, + &mut conn, )?; del_word_pair_proximity.clear(); add_word_pair_proximity.clear(); @@ -168,6 +171,7 @@ pub fn extract_word_pair_proximity_docids( &del_word_pair_proximity, &add_word_pair_proximity, &mut word_pair_proximity_docids_sorters, + &mut conn, )?; } { @@ -198,6 +202,7 @@ fn document_word_positions_into_sorter( del_word_pair_proximity: &BTreeMap<(String, String), u8>, add_word_pair_proximity: &BTreeMap<(String, String), u8>, word_pair_proximity_docids_sorters: &mut [grenad::Sorter], + conn: &mut redis::Connection, ) -> Result<()> { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; @@ -233,6 +238,7 @@ fn document_word_positions_into_sorter( key_buffer.push(0); key_buffer.extend_from_slice(w2.as_bytes()); + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); word_pair_proximity_docids_sorters[*prox as usize - 1] .insert(&key_buffer, value_writer.into_inner().unwrap())?; } diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index 50b1617f9..0c169bfd3 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -25,6 +25,7 @@ pub fn extract_word_position_docids( indexer: GrenadParameters, _settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { + let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); let max_memory = indexer.max_memory_by_thread(); let mut word_position_docids_sorter = create_sorter( @@ -53,6 +54,7 @@ pub fn extract_word_position_docids( &del_word_positions, &add_word_positions, &mut word_position_docids_sorter, + &mut conn, )?; del_word_positions.clear(); add_word_positions.clear(); @@ -85,6 +87,7 @@ pub fn extract_word_position_docids( &del_word_positions, &add_word_positions, &mut word_position_docids_sorter, + &mut conn, )?; } @@ -101,6 +104,7 @@ fn words_position_into_sorter( del_word_positions: &BTreeSet<(u16, Vec)>, add_word_positions: &BTreeSet<(u16, Vec)>, word_position_docids_sorter: &mut grenad::Sorter, + conn: &mut redis::Connection, ) -> Result<()> { use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; @@ -131,6 +135,7 @@ fn words_position_into_sorter( key_buffer.extend_from_slice(word_bytes); key_buffer.push(0); key_buffer.extend_from_slice(&position.to_be_bytes()); + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; } diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 57d9d5e42..8704df03b 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -35,6 +35,9 @@ use crate::update::settings::InnerIndexSettingsDiff; use crate::vector::error::PossibleEmbeddingMistakes; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; +pub static REDIS_CLIENT: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(|| redis::Client::open("redis://127.0.0.1/").unwrap()); + /// Extract data for each databases from obkv documents in parallel. /// Send data in grenad file over provided Sender. #[allow(clippy::too_many_arguments)]