From 5d5769fd8a3e0fe4b2f0c736e7077e35c4f7e9c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 17 Jul 2024 15:54:19 +0200 Subject: [PATCH] Introduce a new Sorter Cache for CboRoaringBitmaps --- Cargo.lock | 10 ++ milli/Cargo.toml | 3 +- milli/src/update/index_documents/cache.rs | 138 ++++++++++++++++++++++ milli/src/update/index_documents/mod.rs | 1 + 4 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 milli/src/update/index_documents/cache.rs diff --git a/Cargo.lock b/Cargo.lock index a5ff2f7c1..7cfff278d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3290,6 +3290,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "lzma-rs" version = "0.3.0" @@ -3550,6 +3559,7 @@ dependencies = [ "json-depth-checker", "levenshtein_automata", "liquid", + "lru", "maplit", "md5", "meili-snap", diff --git a/milli/Cargo.toml b/milli/Cargo.toml index a9fc84cab..12519062d 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -50,7 +50,7 @@ serde = { version = "1.0.204", features = ["derive"] } serde_json = { version = "1.0.120", features = ["preserve_order"] } slice-group-by = "0.3.1" smallstr = { version = "0.3.0", features = ["serde"] } -smallvec = "1.13.2" +smallvec = { version = "1.13.2", features = ["union"] } smartstring = "1.0.1" tempfile = "3.10.1" thiserror = "1.0.61" @@ -88,6 +88,7 @@ tracing = "0.1.40" ureq = { version = "2.10.0", features = ["json"] } url = "2.5.2" rayon-par-bridge = "0.1.0" +lru = "0.12.3" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/milli/src/update/index_documents/cache.rs b/milli/src/update/index_documents/cache.rs new file mode 100644 index 000000000..8c6a99347 --- /dev/null +++ b/milli/src/update/index_documents/cache.rs @@ -0,0 +1,138 @@ +use std::borrow::Cow; +use std::mem; +use std::num::NonZeroUsize; + +use lru::LruCache; +use roaring::RoaringBitmap; +use smallvec::SmallVec; + +use crate::update::del_add::{DelAdd, KvWriterDelAdd}; +use crate::CboRoaringBitmapCodec; + +pub struct SorterCacheDelAddCboRoaringBitmap { + cache: LruCache, DelAddRoaringBitmap>, + sorter: grenad::Sorter, + deladd_buffer: Vec, + cbo_buffer: Vec, + conn: redis::Connection, +} + +impl SorterCacheDelAddCboRoaringBitmap { + pub fn new(cap: NonZeroUsize, sorter: grenad::Sorter, conn: redis::Connection) -> Self { + SorterCacheDelAddCboRoaringBitmap { + cache: LruCache::new(cap), + sorter, + deladd_buffer: Vec::new(), + cbo_buffer: Vec::new(), + conn, + } + } +} + +impl SorterCacheDelAddCboRoaringBitmap +where + MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result, U>, +{ + pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del, add: _ }) => { + del.get_or_insert_with(RoaringBitmap::new).insert(n); + Ok(()) + } + None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del(n)) { + Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), + None => Ok(()), + }, + } + } + + pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del: _, add }) => { + add.get_or_insert_with(RoaringBitmap::new).insert(n); + Ok(()) + } + None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_add(n)) { + Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), + None => Ok(()), + }, + } + } + + pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error> { + match self.cache.get_mut(key) { + Some(DelAddRoaringBitmap { del, add }) => { + del.get_or_insert_with(RoaringBitmap::new).insert(n); + add.get_or_insert_with(RoaringBitmap::new).insert(n); + Ok(()) + } + None => match self.cache.push(key.into(), DelAddRoaringBitmap::new_del_add(n)) { + Some((key, deladd)) => self.write_entry_to_sorter(key, deladd), + None => Ok(()), + }, + } + } + + fn write_entry_to_sorter( + &mut self, + key: SmallVec<[u8; N]>, + deladd: DelAddRoaringBitmap, + ) -> Result<(), grenad::Error> { + self.deladd_buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer); + match deladd { + DelAddRoaringBitmap { del: Some(del), add: None } => { + self.cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer); + value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?; + } + DelAddRoaringBitmap { del: None, add: Some(add) } => { + self.cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer); + value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?; + } + DelAddRoaringBitmap { del: Some(del), add: Some(add) } => { + self.cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer); + value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?; + + self.cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer); + value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?; + } + DelAddRoaringBitmap { del: None, add: None } => return Ok(()), + } + redis::cmd("INCR").arg(key.as_ref()).query::(&mut self.conn).unwrap(); + self.sorter.insert(key, value_writer.into_inner().unwrap()) + } + + pub fn into_sorter(mut self) -> Result, grenad::Error> { + let default_lru = LruCache::new(NonZeroUsize::MIN); + for (key, deladd) in mem::replace(&mut self.cache, default_lru) { + self.write_entry_to_sorter(key, deladd)?; + } + Ok(self.sorter) + } +} + +pub struct DelAddRoaringBitmap { + pub del: Option, + pub add: Option, +} + +impl DelAddRoaringBitmap { + fn new_del_add(n: u32) -> Self { + DelAddRoaringBitmap { + del: Some(RoaringBitmap::from([n])), + add: Some(RoaringBitmap::from([n])), + } + } + + fn new_del(n: u32) -> Self { + DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: None } + } + + fn new_add(n: u32) -> Self { + DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) } + } +} diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 8b2e6683e..0a9a475bb 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -1,3 +1,4 @@ +mod cache; mod enrich; mod extract; mod helpers;