Use sled to count the write insertions

This commit is contained in:
Clément Renault 2024-07-20 11:16:57 +02:00
parent 2603d8d0d0
commit f355cf6985
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
15 changed files with 106 additions and 72 deletions

View file

@ -19,7 +19,7 @@ pub struct SorterCacheDelAddCboRoaringBitmap<const N: usize, MF> {
sorter: grenad::Sorter<MF>,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
conn: redis::Connection,
conn: sled::Db,
}
impl<const N: usize, MF> SorterCacheDelAddCboRoaringBitmap<N, MF> {
@ -27,7 +27,7 @@ impl<const N: usize, MF> SorterCacheDelAddCboRoaringBitmap<N, MF> {
cap: NonZeroUsize,
sorter: grenad::Sorter<MF>,
prefix: &'static [u8; 3],
conn: redis::Connection,
conn: sled::Db,
) -> Self {
SorterCacheDelAddCboRoaringBitmap {
cache: ArcCache::new(cap),
@ -205,7 +205,7 @@ where
self.cbo_buffer.clear();
self.cbo_buffer.extend_from_slice(self.prefix);
self.cbo_buffer.extend_from_slice(key.as_ref());
redis::cmd("INCR").arg(&self.cbo_buffer).query::<usize>(&mut self.conn).unwrap();
self.conn.merge(&self.cbo_buffer, 1u32.to_ne_bytes()).unwrap();
self.sorter.insert(key, value_writer.into_inner().unwrap())
}

View file

@ -29,7 +29,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
settings_diff: &InnerIndexSettingsDiff,
max_positions_per_attributes: Option<u32>,
) -> Result<(grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let conn = super::SLED_DB.clone();
let max_positions_per_attributes = max_positions_per_attributes
.map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE));
@ -150,7 +150,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
for (field_id, value) in obkv.iter() {
key_buffer.truncate(mem::size_of::<u32>());
key_buffer.extend_from_slice(&field_id.to_be_bytes());
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(&mut conn).unwrap();
conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap();
docid_word_positions_sorter.insert(&key_buffer, value)?;
}

View file

@ -41,7 +41,7 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
NonZeroUsize::new(20).unwrap(),
facet_number_docids_sorter,
b"fnd",
super::REDIS_CLIENT.get_connection().unwrap(),
super::SLED_DB.clone(),
);
let mut cursor = fid_docid_facet_number.into_cursor()?;

View file

@ -10,7 +10,7 @@ use heed::types::SerdeJson;
use heed::BytesEncode;
use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters};
use super::REDIS_CLIENT;
use super::SLED_DB;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
use crate::heed_codec::{BEU16StrCodec, StrRefCodec};
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
@ -32,7 +32,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff,
) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
let mut conn = REDIS_CLIENT.get_connection().unwrap();
let conn = SLED_DB.clone();
let max_memory = indexer.max_memory_by_thread();
let options = NormalizerOption { lossy: true, ..Default::default() };
@ -49,7 +49,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
NonZeroUsize::new(200).unwrap(),
facet_string_docids_sorter,
b"fsd",
REDIS_CLIENT.get_connection().unwrap(),
SLED_DB.clone(),
);
let mut normalized_facet_string_docids_sorter = create_sorter(
@ -106,7 +106,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
let key = (field_id, hyper_normalized_value.as_ref());
let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
redis::cmd("INCR").arg(key_bytes.as_ref()).query::<usize>(&mut conn).unwrap();
conn.merge(key_bytes.as_ref(), 1u32.to_ne_bytes()).unwrap();
normalized_facet_string_docids_sorter.insert(key_bytes, &buffer)?;
}

View file

@ -46,7 +46,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff,
) -> Result<ExtractedFacetValues> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let mut conn = super::SLED_DB.clone();
let max_memory = indexer.max_memory_by_thread();
let mut fid_docid_facet_numbers_sorter = create_sorter(
@ -334,7 +334,7 @@ fn insert_numbers_diff<MF>(
key_buffer: &mut Vec<u8>,
mut del_numbers: Vec<f64>,
mut add_numbers: Vec<f64>,
conn: &mut redis::Connection,
conn: &mut sled::Db,
) -> Result<()>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
@ -366,7 +366,7 @@ where
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, bytes_of(&()))?;
let bytes = obkv.into_inner()?;
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap();
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
}
}
@ -380,7 +380,7 @@ where
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, bytes_of(&()))?;
let bytes = obkv.into_inner()?;
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap();
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
}
}
@ -397,7 +397,7 @@ fn insert_strings_diff<MF>(
key_buffer: &mut Vec<u8>,
mut del_strings: Vec<(String, String)>,
mut add_strings: Vec<(String, String)>,
conn: &mut redis::Connection,
conn: &mut sled::Db,
) -> Result<()>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
@ -426,7 +426,7 @@ where
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, original)?;
let bytes = obkv.into_inner()?;
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap();
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
}
EitherOrBoth::Right((normalized, original)) => {
@ -436,7 +436,7 @@ where
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, original)?;
let bytes = obkv.into_inner()?;
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
conn.merge(key_buffer.as_slice(), 1u32.to_ne_bytes()).unwrap();
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
}
}

View file

@ -44,7 +44,7 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
NonZeroUsize::new(300).unwrap(),
fid_word_count_docids_sorter,
b"fwc",
super::REDIS_CLIENT.get_connection().unwrap(),
super::SLED_DB.clone(),
);
let mut key_buffer = Vec::new();

View file

@ -11,7 +11,7 @@ use super::helpers::{
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at,
writer_into_reader, GrenadParameters,
};
use super::REDIS_CLIENT;
use super::SLED_DB;
use crate::error::SerializationError;
use crate::heed_codec::StrBEU16Codec;
use crate::index::db_name::DOCID_WORD_POSITIONS;
@ -53,7 +53,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
NonZeroUsize::new(300).unwrap(),
word_fid_docids_sorter,
b"wfd",
REDIS_CLIENT.get_connection().unwrap(),
SLED_DB.clone(),
);
let mut key_buffer = Vec::new();
@ -114,7 +114,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
NonZeroUsize::new(100).unwrap(),
word_docids_sorter,
b"wdi",
REDIS_CLIENT.get_connection().unwrap(),
SLED_DB.clone(),
);
let exact_word_docids_sorter = create_sorter(
@ -129,7 +129,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
NonZeroUsize::new(100).unwrap(),
exact_word_docids_sorter,
b"ewd",
REDIS_CLIENT.get_connection().unwrap(),
SLED_DB.clone(),
);
let mut iter = cached_word_fid_docids_sorter.into_sorter()?.into_stream_merger_iter()?;
@ -221,7 +221,7 @@ fn docids_into_writers<W>(
deletions: &RoaringBitmap,
additions: &RoaringBitmap,
writer: &mut grenad::Writer<W>,
conn: &mut redis::Connection,
conn: &mut sled::Db,
) -> Result<()>
where
W: std::io::Write,
@ -253,7 +253,7 @@ where
}
// insert everything in the same writer.
redis::cmd("INCR").arg(word.as_bytes()).query::<usize>(conn).unwrap();
conn.merge(word.as_bytes(), 1u32.to_ne_bytes()).unwrap();
writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?;
Ok(())

View file

@ -56,7 +56,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
NonZeroUsize::new(100).unwrap(),
sorter,
b"wpp",
super::REDIS_CLIENT.get_connection().unwrap(),
super::SLED_DB.clone(),
)
})
.collect();

View file

@ -42,7 +42,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
NonZeroUsize::new(300).unwrap(),
word_position_docids_sorter,
b"wpd",
super::REDIS_CLIENT.get_connection().unwrap(),
super::SLED_DB.clone(),
);
let mut del_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();

View file

@ -35,8 +35,22 @@ use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::error::PossibleEmbeddingMistakes;
use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
pub static REDIS_CLIENT: once_cell::sync::Lazy<redis::Client> =
once_cell::sync::Lazy::new(|| redis::Client::open("redis://127.0.0.1/").unwrap());
pub static SLED_DB: once_cell::sync::Lazy<sled::Db> = once_cell::sync::Lazy::new(|| {
fn increment_u32(
_key: &[u8],
old_value: Option<&[u8]>,
merged_bytes: &[u8],
) -> Option<Vec<u8>> {
let current_count = old_value.map_or(0, |b| b.try_into().map(u32::from_ne_bytes).unwrap());
let new_count = merged_bytes.try_into().map(u32::from_ne_bytes).unwrap();
let count = current_count.saturating_add(new_count).to_be_bytes();
Some(count.to_vec())
}
let db = sled::open("write-stats").unwrap();
db.set_merge_operator(increment_u32);
db
});
/// Extract data for each databases from obkv documents in parallel.
/// Send data in grenad file over provided Sender.

View file

@ -14,7 +14,7 @@ use std::result::Result as StdResult;
use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender};
pub use extract::REDIS_CLIENT;
pub use extract::SLED_DB;
use grenad::{Merger, MergerBuilder};
use heed::types::Str;
use heed::Database;

View file

@ -6,7 +6,7 @@ use heed::types::Str;
use heed::Database;
use super::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use super::index_documents::REDIS_CLIENT;
use super::index_documents::SLED_DB;
use crate::update::del_add::deladd_serialize_add_side;
use crate::update::index_documents::{
create_sorter, merge_deladd_cbo_roaring_bitmaps,
@ -69,7 +69,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
NonZeroUsize::new(200).unwrap(),
prefix_docids_sorter,
b"pdi",
REDIS_CLIENT.get_connection().unwrap(),
SLED_DB.clone(),
);
if !common_prefix_fst_words.is_empty() {

View file

@ -15,7 +15,7 @@ use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::index_documents::{
create_sorter, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
write_sorter_into_database, CursorClonableMmap, MergeFn, REDIS_CLIENT,
write_sorter_into_database, CursorClonableMmap, MergeFn, SLED_DB,
};
use crate::{CboRoaringBitmapCodec, Result};
@ -74,7 +74,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
NonZeroUsize::new(200).unwrap(),
prefix_integer_docids_sorter,
b"pid",
REDIS_CLIENT.get_connection().unwrap(),
SLED_DB.clone(),
);
if !common_prefix_fst_words.is_empty() {