From f355cf69854671e196d3b493345821b6523c6144 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sat, 20 Jul 2024 11:16:57 +0200 Subject: [PATCH] Use sled to count the write insertions --- Cargo.lock | 96 +++++++++++-------- milli/Cargo.toml | 2 +- milli/src/update/index_documents/cache.rs | 6 +- .../extract/extract_docid_word_positions.rs | 4 +- .../extract/extract_facet_number_docids.rs | 2 +- .../extract/extract_facet_string_docids.rs | 8 +- .../extract/extract_fid_docid_facet_values.rs | 14 +-- .../extract/extract_fid_word_count_docids.rs | 2 +- .../extract/extract_word_docids.rs | 12 +-- .../extract_word_pair_proximity_docids.rs | 2 +- .../extract/extract_word_position_docids.rs | 2 +- .../src/update/index_documents/extract/mod.rs | 18 +++- milli/src/update/index_documents/mod.rs | 2 +- milli/src/update/word_prefix_docids.rs | 4 +- .../src/update/words_prefix_integer_docids.rs | 4 +- 15 files changed, 106 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7cfff278d..aa412dacf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1047,16 +1047,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "combine" -version = "4.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" -dependencies = [ - "bytes", - "memchr", -] - [[package]] name = "concat-arrays" version = "0.1.2" @@ -1889,6 +1879,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fst" version = "0.4.7" @@ -3404,7 +3404,7 @@ dependencies = [ "obkv", "once_cell", "ordered-float", - "parking_lot", + "parking_lot 0.12.3", "permissive-json-pointer", "pin-project-lite", "platform-dirs", @@ -3572,12 +3572,12 @@ dependencies = [ "rand", "rayon", "rayon-par-bridge", - "redis", "rhai", "roaring", "rstar", "serde", "serde_json", + "sled", "slice-group-by", "smallstr", "smallvec", @@ -3891,6 +3891,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -3898,7 +3909,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.8", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -4240,7 +4265,7 @@ dependencies = [ "lazy_static", "libc", "memchr", - "parking_lot", + "parking_lot 0.12.3", "procfs", "protobuf", "thiserror", @@ -4441,21 +4466,6 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03251193000f4bd3b042892be858ee50e8b3719f2b08e5833ac4353724632430" -[[package]] -name = "redis" -version = "0.25.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" -dependencies = [ - "combine", - "itoa", - "percent-encoding", - "ryu", - "sha1_smol", - "socket2 0.5.5", - "url", -] - [[package]] name = "redox_syscall" version = "0.2.16" @@ -4927,12 +4937,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sha1_smol" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" - [[package]] name = "sha2" version = "0.10.8" @@ -5019,6 +5023,22 @@ dependencies = [ "autocfg", ] +[[package]] +name = "sled" +version = "0.34.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" +dependencies = [ + "crc32fast", + "crossbeam-epoch", + "crossbeam-utils", + "fs2", + "fxhash", + "libc", + "log", + "parking_lot 0.11.2", +] + [[package]] name = "slice-group-by" version = "0.3.1" @@ -5280,7 +5300,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96374855068f47402c3121c6eed88d29cb1de8f3ab27090e273e420bdabcf050" dependencies = [ - "parking_lot", + "parking_lot 0.12.3", ] [[package]] @@ -5354,7 +5374,7 @@ dependencies = [ "bstr", "fancy-regex 0.12.0", "lazy_static", - "parking_lot", + "parking_lot 0.12.3", "rustc-hash", ] @@ -5466,7 +5486,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2 0.5.5", diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 12519062d..9a9da9556 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -67,7 +67,7 @@ filter-parser = { path = "../filter-parser" } # documents words self-join itertools = "0.13.0" -redis = "0.25.4" +sled = "0.34.7" csv = "1.3.0" candle-core = { version = "0.6.0" } diff --git a/milli/src/update/index_documents/cache.rs b/milli/src/update/index_documents/cache.rs index 0c8799d52..1c9079b1f 100644 --- a/milli/src/update/index_documents/cache.rs +++ b/milli/src/update/index_documents/cache.rs @@ -19,7 +19,7 @@ pub struct SorterCacheDelAddCboRoaringBitmap { sorter: grenad::Sorter, deladd_buffer: Vec, cbo_buffer: Vec, - conn: redis::Connection, + conn: sled::Db, } impl SorterCacheDelAddCboRoaringBitmap { @@ -27,7 +27,7 @@ impl SorterCacheDelAddCboRoaringBitmap { cap: NonZeroUsize, sorter: grenad::Sorter, prefix: &'static [u8; 3], - conn: redis::Connection, + conn: sled::Db, ) -> Self { SorterCacheDelAddCboRoaringBitmap { cache: ArcCache::new(cap), @@ -205,7 +205,7 @@ where self.cbo_buffer.clear(); self.cbo_buffer.extend_from_slice(self.prefix); self.cbo_buffer.extend_from_slice(key.as_ref()); - redis::cmd("INCR").arg(&self.cbo_buffer).query::(&mut self.conn).unwrap(); + self.conn.merge(&self.cbo_buffer, 1u32.to_ne_bytes()).unwrap(); self.sorter.insert(key, value_writer.into_inner().unwrap()) } diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index 96e96701c..76d74bcd4 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -29,7 +29,7 @@ pub fn extract_docid_word_positions( settings_diff: &InnerIndexSettingsDiff, max_positions_per_attributes: Option, ) -> Result<(grenad::Reader>, ScriptLanguageDocidsMap)> { - let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); + let conn = super::SLED_DB.clone(); let max_positions_per_attributes = max_positions_per_attributes .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); @@ -150,7 +150,7 @@ pub fn extract_docid_word_positions( for (field_id, value) in obkv.iter() { key_buffer.truncate(mem::size_of::()); key_buffer.extend_from_slice(&field_id.to_be_bytes()); - redis::cmd("INCR").arg(key_buffer.as_slice()).query::(&mut conn).unwrap(); + conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap(); docid_word_positions_sorter.insert(&key_buffer, value)?; } diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index f36903b8f..dd71c5b36 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -41,7 +41,7 @@ pub fn extract_facet_number_docids( NonZeroUsize::new(20).unwrap(), facet_number_docids_sorter, b"fnd", - super::REDIS_CLIENT.get_connection().unwrap(), + super::SLED_DB.clone(), ); let mut cursor = fid_docid_facet_number.into_cursor()?; diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index da6b8fd82..52e734269 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -10,7 +10,7 @@ use heed::types::SerdeJson; use heed::BytesEncode; use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters}; -use super::REDIS_CLIENT; +use super::SLED_DB; use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec}; use crate::heed_codec::{BEU16StrCodec, StrRefCodec}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; @@ -32,7 +32,7 @@ pub fn extract_facet_string_docids( indexer: GrenadParameters, _settings_diff: &InnerIndexSettingsDiff, ) -> Result<(grenad::Reader>, grenad::Reader>)> { - let mut conn = REDIS_CLIENT.get_connection().unwrap(); + let conn = SLED_DB.clone(); let max_memory = indexer.max_memory_by_thread(); let options = NormalizerOption { lossy: true, ..Default::default() }; @@ -49,7 +49,7 @@ pub fn extract_facet_string_docids( NonZeroUsize::new(200).unwrap(), facet_string_docids_sorter, b"fsd", - REDIS_CLIENT.get_connection().unwrap(), + SLED_DB.clone(), ); let mut normalized_facet_string_docids_sorter = create_sorter( @@ -106,7 +106,7 @@ pub fn extract_facet_string_docids( let key = (field_id, hyper_normalized_value.as_ref()); let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; - redis::cmd("INCR").arg(key_bytes.as_ref()).query::(&mut conn).unwrap(); + conn.merge(key_bytes.as_ref(), 1u32.to_ne_bytes()).unwrap(); normalized_facet_string_docids_sorter.insert(key_bytes, &buffer)?; } diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 14561281b..c3b21c9e2 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -46,7 +46,7 @@ pub fn extract_fid_docid_facet_values( indexer: GrenadParameters, settings_diff: &InnerIndexSettingsDiff, ) -> Result { - let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); + let mut conn = super::SLED_DB.clone(); let max_memory = indexer.max_memory_by_thread(); let mut fid_docid_facet_numbers_sorter = create_sorter( @@ -334,7 +334,7 @@ fn insert_numbers_diff( key_buffer: &mut Vec, mut del_numbers: Vec, mut add_numbers: Vec, - conn: &mut redis::Connection, + conn: &mut sled::Db, ) -> Result<()> where MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, @@ -366,7 +366,7 @@ where let mut obkv = KvWriterDelAdd::memory(); obkv.insert(DelAdd::Deletion, bytes_of(&()))?; let bytes = obkv.into_inner()?; - redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); + conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap(); fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; } } @@ -380,7 +380,7 @@ where let mut obkv = KvWriterDelAdd::memory(); obkv.insert(DelAdd::Addition, bytes_of(&()))?; let bytes = obkv.into_inner()?; - redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); + conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap(); fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; } } @@ -397,7 +397,7 @@ fn insert_strings_diff( key_buffer: &mut Vec, mut del_strings: Vec<(String, String)>, mut add_strings: Vec<(String, String)>, - conn: &mut redis::Connection, + conn: &mut sled::Db, ) -> Result<()> where MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, @@ -426,7 +426,7 @@ where let mut obkv = KvWriterDelAdd::memory(); obkv.insert(DelAdd::Deletion, original)?; let bytes = obkv.into_inner()?; - redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); + conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap(); fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; } EitherOrBoth::Right((normalized, original)) => { @@ -436,7 +436,7 @@ where let mut obkv = KvWriterDelAdd::memory(); obkv.insert(DelAdd::Addition, original)?; let bytes = obkv.into_inner()?; - redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); + conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap(); fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; } } diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index 49de09eee..863a6ea21 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -44,7 +44,7 @@ pub fn extract_fid_word_count_docids( NonZeroUsize::new(300).unwrap(), fid_word_count_docids_sorter, b"fwc", - super::REDIS_CLIENT.get_connection().unwrap(), + super::SLED_DB.clone(), ); let mut key_buffer = Vec::new(); diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index fca9ca3b1..15d165513 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -11,7 +11,7 @@ use super::helpers::{ create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, writer_into_reader, GrenadParameters, }; -use super::REDIS_CLIENT; +use super::SLED_DB; use crate::error::SerializationError; use crate::heed_codec::StrBEU16Codec; use crate::index::db_name::DOCID_WORD_POSITIONS; @@ -53,7 +53,7 @@ pub fn extract_word_docids( NonZeroUsize::new(300).unwrap(), word_fid_docids_sorter, b"wfd", - REDIS_CLIENT.get_connection().unwrap(), + SLED_DB.clone(), ); let mut key_buffer = Vec::new(); @@ -114,7 +114,7 @@ pub fn extract_word_docids( NonZeroUsize::new(100).unwrap(), word_docids_sorter, b"wdi", - REDIS_CLIENT.get_connection().unwrap(), + SLED_DB.clone(), ); let exact_word_docids_sorter = create_sorter( @@ -129,7 +129,7 @@ pub fn extract_word_docids( NonZeroUsize::new(100).unwrap(), exact_word_docids_sorter, b"ewd", - REDIS_CLIENT.get_connection().unwrap(), + SLED_DB.clone(), ); let mut iter = cached_word_fid_docids_sorter.into_sorter()?.into_stream_merger_iter()?; @@ -221,7 +221,7 @@ fn docids_into_writers( deletions: &RoaringBitmap, additions: &RoaringBitmap, writer: &mut grenad::Writer, - conn: &mut redis::Connection, + conn: &mut sled::Db, ) -> Result<()> where W: std::io::Write, @@ -253,7 +253,7 @@ where } // insert everything in the same writer. - redis::cmd("INCR").arg(word.as_bytes()).query::(conn).unwrap(); + conn.merge(word.as_bytes(), 1u32.to_ne_bytes()).unwrap(); writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?; Ok(()) diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 6b7a2496c..0af75cf70 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -56,7 +56,7 @@ pub fn extract_word_pair_proximity_docids( NonZeroUsize::new(100).unwrap(), sorter, b"wpp", - super::REDIS_CLIENT.get_connection().unwrap(), + super::SLED_DB.clone(), ) }) .collect(); diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index 7343b6e6b..d85df886f 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -42,7 +42,7 @@ pub fn extract_word_position_docids( NonZeroUsize::new(300).unwrap(), word_position_docids_sorter, b"wpd", - super::REDIS_CLIENT.get_connection().unwrap(), + super::SLED_DB.clone(), ); let mut del_word_positions: BTreeSet<(u16, Vec)> = BTreeSet::new(); diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 8704df03b..1ad6ec84d 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -35,8 +35,22 @@ use crate::update::settings::InnerIndexSettingsDiff; use crate::vector::error::PossibleEmbeddingMistakes; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; -pub static REDIS_CLIENT: once_cell::sync::Lazy = - once_cell::sync::Lazy::new(|| redis::Client::open("redis://127.0.0.1/").unwrap()); +pub static SLED_DB: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { + fn increment_u32( + _key: &[u8], + old_value: Option<&[u8]>, + merged_bytes: &[u8], + ) -> Option> { + let current_count = old_value.map_or(0, |b| b.try_into().map(u32::from_ne_bytes).unwrap()); + let new_count = merged_bytes.try_into().map(u32::from_ne_bytes).unwrap(); + let count = current_count.saturating_add(new_count).to_be_bytes(); + Some(count.to_vec()) + } + + let db = sled::open("write-stats").unwrap(); + db.set_merge_operator(increment_u32); + db +}); /// Extract data for each databases from obkv documents in parallel. /// Send data in grenad file over provided Sender. diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index c6dfd86a7..5b4503f51 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -14,7 +14,7 @@ use std::result::Result as StdResult; use std::sync::Arc; use crossbeam_channel::{Receiver, Sender}; -pub use extract::REDIS_CLIENT; +pub use extract::SLED_DB; use grenad::{Merger, MergerBuilder}; use heed::types::Str; use heed::Database; diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index c551642ba..8fa70dbcf 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -6,7 +6,7 @@ use heed::types::Str; use heed::Database; use super::index_documents::cache::SorterCacheDelAddCboRoaringBitmap; -use super::index_documents::REDIS_CLIENT; +use super::index_documents::SLED_DB; use crate::update::del_add::deladd_serialize_add_side; use crate::update::index_documents::{ create_sorter, merge_deladd_cbo_roaring_bitmaps, @@ -69,7 +69,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { NonZeroUsize::new(200).unwrap(), prefix_docids_sorter, b"pdi", - REDIS_CLIENT.get_connection().unwrap(), + SLED_DB.clone(), ); if !common_prefix_fst_words.is_empty() { diff --git a/milli/src/update/words_prefix_integer_docids.rs b/milli/src/update/words_prefix_integer_docids.rs index 038882530..088069b45 100644 --- a/milli/src/update/words_prefix_integer_docids.rs +++ b/milli/src/update/words_prefix_integer_docids.rs @@ -15,7 +15,7 @@ use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap; use crate::update::index_documents::{ create_sorter, merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, - write_sorter_into_database, CursorClonableMmap, MergeFn, REDIS_CLIENT, + write_sorter_into_database, CursorClonableMmap, MergeFn, SLED_DB, }; use crate::{CboRoaringBitmapCodec, Result}; @@ -74,7 +74,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { NonZeroUsize::new(200).unwrap(), prefix_integer_docids_sorter, b"pid", - REDIS_CLIENT.get_connection().unwrap(), + SLED_DB.clone(), ); if !common_prefix_fst_words.is_empty() {