mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-08 20:44:30 +01:00
Use the Bbbul crate in the cache to better control memory
This commit is contained in:
parent
8b260de5a0
commit
a9ecbf0b64
12
Cargo.lock
generated
12
Cargo.lock
generated
@ -572,6 +572,15 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitpacking"
|
||||
version = "0.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4c1d3e2bfd8d06048a179f7b17afc3188effa10385e7b00dc65af6aae732ea92"
|
||||
dependencies = [
|
||||
"crunchy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitvec"
|
||||
version = "1.0.1"
|
||||
@ -4430,9 +4439,10 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "raw-collections"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/dureuill/raw-collections.git#4ab9619207632c20f4e0c2e126d9d909cc58ef65"
|
||||
source = "git+https://github.com/dureuill/raw-collections.git#48801130cb16d758ba9c610b0fe25747e4d85534"
|
||||
dependencies = [
|
||||
"allocator-api2",
|
||||
"bitpacking",
|
||||
"bumpalo",
|
||||
"hashbrown 0.15.0",
|
||||
"serde",
|
||||
|
@ -72,7 +72,9 @@ use bumpalo::Bump;
|
||||
use grenad::ReaderCursor;
|
||||
use hashbrown::hash_map::RawEntryMut;
|
||||
use hashbrown::HashMap;
|
||||
use raw_collections::bbbul::{BitPacker, BitPacker4x};
|
||||
use raw_collections::map::FrozenMap;
|
||||
use raw_collections::{Bbbul, FrozenBbbul};
|
||||
use roaring::RoaringBitmap;
|
||||
use rustc_hash::FxBuildHasher;
|
||||
|
||||
@ -130,7 +132,7 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
Ok(())
|
||||
}
|
||||
InnerCaches::Spilling(spilling) => {
|
||||
spilling.insert_del_u32(&self.hasher, buckets, key, n)
|
||||
spilling.insert_del_u32(&self.hasher, self.alloc, buckets, key, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -147,7 +149,7 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
Ok(())
|
||||
}
|
||||
InnerCaches::Spilling(spilling) => {
|
||||
spilling.insert_add_u32(&self.hasher, buckets, key, n)
|
||||
spilling.insert_add_u32(&self.hasher, self.alloc, buckets, key, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -165,7 +167,7 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
);
|
||||
|
||||
let allocated: usize = normal_caches.caches.iter().map(|m| m.allocation_size()).sum();
|
||||
eprintln!("The last allocated HasMap took {allocated} bytes");
|
||||
eprintln!("The last allocated HashMap took {allocated} bytes");
|
||||
|
||||
let dummy = NormalCaches { caches: Vec::new() };
|
||||
let NormalCaches { caches: cache_maps } = mem::replace(normal_caches, dummy);
|
||||
@ -181,6 +183,24 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
.iter_mut()
|
||||
.enumerate()
|
||||
.map(|(bucket, map)| {
|
||||
// safety: we are transmuting the Bbbul into a FrozenBbbul
|
||||
// that are the same size.
|
||||
let map = unsafe {
|
||||
std::mem::transmute::<
|
||||
&mut HashMap<
|
||||
&[u8],
|
||||
DelAddBbbul<BitPacker4x>, // from this
|
||||
FxBuildHasher,
|
||||
&Bump,
|
||||
>,
|
||||
&mut HashMap<
|
||||
&[u8],
|
||||
FrozenDelAddBbbul<BitPacker4x>, // to that
|
||||
FxBuildHasher,
|
||||
&Bump,
|
||||
>,
|
||||
>(map)
|
||||
};
|
||||
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() })
|
||||
})
|
||||
.collect(),
|
||||
@ -196,6 +216,24 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
.map(BufReader::new)
|
||||
.map(|bufreader| grenad::Reader::new(bufreader).map_err(Into::into))
|
||||
.collect::<Result<_>>()?;
|
||||
// safety: we are transmuting the Bbbul into a FrozenBbbul
|
||||
// that are the same size.
|
||||
let map = unsafe {
|
||||
std::mem::transmute::<
|
||||
&mut HashMap<
|
||||
&[u8],
|
||||
DelAddBbbul<BitPacker4x>, // from this
|
||||
FxBuildHasher,
|
||||
&Bump,
|
||||
>,
|
||||
&mut HashMap<
|
||||
&[u8],
|
||||
FrozenDelAddBbbul<BitPacker4x>, // to that
|
||||
FxBuildHasher,
|
||||
&Bump,
|
||||
>,
|
||||
>(map)
|
||||
};
|
||||
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled })
|
||||
})
|
||||
.collect(),
|
||||
@ -206,7 +244,14 @@ impl<'extractor> BalancedCaches<'extractor> {
|
||||
unsafe impl MostlySend for BalancedCaches<'_> {}
|
||||
|
||||
struct NormalCaches<'extractor> {
|
||||
caches: Vec<HashMap<&'extractor [u8], DelAddRoaringBitmap, FxBuildHasher, &'extractor Bump>>,
|
||||
caches: Vec<
|
||||
HashMap<
|
||||
&'extractor [u8],
|
||||
DelAddBbbul<'extractor, BitPacker4x>,
|
||||
FxBuildHasher,
|
||||
&'extractor Bump,
|
||||
>,
|
||||
>,
|
||||
}
|
||||
|
||||
impl<'extractor> NormalCaches<'extractor> {
|
||||
@ -223,13 +268,13 @@ impl<'extractor> NormalCaches<'extractor> {
|
||||
|
||||
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
|
||||
RawEntryMut::Occupied(mut entry) => {
|
||||
entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n);
|
||||
entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
}
|
||||
RawEntryMut::Vacant(entry) => {
|
||||
entry.insert_hashed_nocheck(
|
||||
hash,
|
||||
alloc.alloc_slice_copy(key),
|
||||
DelAddRoaringBitmap::new_del_u32(n),
|
||||
DelAddBbbul::new_del_u32_in(n, alloc),
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -247,13 +292,13 @@ impl<'extractor> NormalCaches<'extractor> {
|
||||
let bucket = compute_bucket_from_hash(buckets, hash);
|
||||
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
|
||||
RawEntryMut::Occupied(mut entry) => {
|
||||
entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n);
|
||||
entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
}
|
||||
RawEntryMut::Vacant(entry) => {
|
||||
entry.insert_hashed_nocheck(
|
||||
hash,
|
||||
alloc.alloc_slice_copy(key),
|
||||
DelAddRoaringBitmap::new_add_u32(n),
|
||||
DelAddBbbul::new_add_u32_in(n, alloc),
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -261,7 +306,14 @@ impl<'extractor> NormalCaches<'extractor> {
|
||||
}
|
||||
|
||||
struct SpillingCaches<'extractor> {
|
||||
caches: Vec<HashMap<&'extractor [u8], DelAddRoaringBitmap, FxBuildHasher, &'extractor Bump>>,
|
||||
caches: Vec<
|
||||
HashMap<
|
||||
&'extractor [u8],
|
||||
DelAddBbbul<'extractor, BitPacker4x>,
|
||||
FxBuildHasher,
|
||||
&'extractor Bump,
|
||||
>,
|
||||
>,
|
||||
spilled_entries: Vec<grenad::Sorter<MergeDeladdCboRoaringBitmaps>>,
|
||||
deladd_buffer: Vec<u8>,
|
||||
cbo_buffer: Vec<u8>,
|
||||
@ -270,7 +322,12 @@ struct SpillingCaches<'extractor> {
|
||||
impl<'extractor> SpillingCaches<'extractor> {
|
||||
fn from_cache_maps(
|
||||
caches: Vec<
|
||||
HashMap<&'extractor [u8], DelAddRoaringBitmap, FxBuildHasher, &'extractor Bump>,
|
||||
HashMap<
|
||||
&'extractor [u8],
|
||||
DelAddBbbul<'extractor, BitPacker4x>,
|
||||
FxBuildHasher,
|
||||
&'extractor Bump,
|
||||
>,
|
||||
>,
|
||||
) -> SpillingCaches<'extractor> {
|
||||
SpillingCaches {
|
||||
@ -291,6 +348,7 @@ impl<'extractor> SpillingCaches<'extractor> {
|
||||
pub fn insert_del_u32(
|
||||
&mut self,
|
||||
hasher: &FxBuildHasher,
|
||||
alloc: &'extractor Bump,
|
||||
buckets: usize,
|
||||
key: &[u8],
|
||||
n: u32,
|
||||
@ -299,25 +357,23 @@ impl<'extractor> SpillingCaches<'extractor> {
|
||||
let bucket = compute_bucket_from_hash(buckets, hash);
|
||||
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
|
||||
RawEntryMut::Occupied(mut entry) => {
|
||||
entry.get_mut().del.get_or_insert_with(RoaringBitmap::default).insert(n);
|
||||
entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
Ok(())
|
||||
}
|
||||
RawEntryMut::Vacant(_entry) => {
|
||||
let deladd = DelAddRoaringBitmap::new_del_u32(n);
|
||||
spill_entry_to_sorter(
|
||||
&mut self.spilled_entries[bucket],
|
||||
&mut self.deladd_buffer,
|
||||
&mut self.cbo_buffer,
|
||||
key,
|
||||
deladd,
|
||||
)
|
||||
}
|
||||
RawEntryMut::Vacant(_entry) => spill_entry_to_sorter(
|
||||
&mut self.spilled_entries[bucket],
|
||||
&mut self.deladd_buffer,
|
||||
&mut self.cbo_buffer,
|
||||
key,
|
||||
DelAddRoaringBitmap::new_del_u32(n),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_add_u32(
|
||||
&mut self,
|
||||
hasher: &FxBuildHasher,
|
||||
alloc: &'extractor Bump,
|
||||
buckets: usize,
|
||||
key: &[u8],
|
||||
n: u32,
|
||||
@ -326,19 +382,16 @@ impl<'extractor> SpillingCaches<'extractor> {
|
||||
let bucket = compute_bucket_from_hash(buckets, hash);
|
||||
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
|
||||
RawEntryMut::Occupied(mut entry) => {
|
||||
entry.get_mut().add.get_or_insert_with(RoaringBitmap::default).insert(n);
|
||||
entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
|
||||
Ok(())
|
||||
}
|
||||
RawEntryMut::Vacant(_entry) => {
|
||||
let deladd = DelAddRoaringBitmap::new_add_u32(n);
|
||||
spill_entry_to_sorter(
|
||||
&mut self.spilled_entries[bucket],
|
||||
&mut self.deladd_buffer,
|
||||
&mut self.cbo_buffer,
|
||||
key,
|
||||
deladd,
|
||||
)
|
||||
}
|
||||
RawEntryMut::Vacant(_entry) => spill_entry_to_sorter(
|
||||
&mut self.spilled_entries[bucket],
|
||||
&mut self.deladd_buffer,
|
||||
&mut self.cbo_buffer,
|
||||
key,
|
||||
DelAddRoaringBitmap::new_add_u32(n),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -387,7 +440,13 @@ fn spill_entry_to_sorter(
|
||||
|
||||
pub struct FrozenCache<'a, 'extractor> {
|
||||
bucket: usize,
|
||||
cache: FrozenMap<'a, 'extractor, &'extractor [u8], DelAddRoaringBitmap, FxBuildHasher>,
|
||||
cache: FrozenMap<
|
||||
'a,
|
||||
'extractor,
|
||||
&'extractor [u8],
|
||||
FrozenDelAddBbbul<'extractor, BitPacker4x>,
|
||||
FxBuildHasher,
|
||||
>,
|
||||
spilled: Vec<grenad::Reader<BufReader<File>>>,
|
||||
}
|
||||
|
||||
@ -467,7 +526,7 @@ where
|
||||
for (map_index, map) in maps.iter_mut().enumerate() {
|
||||
if first_entry.source_index != map_index {
|
||||
if let Some(new) = map.get_mut(first_key) {
|
||||
output = output.merge(mem::take(new));
|
||||
output.append_and_clear_bbbul(new);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -483,14 +542,15 @@ where
|
||||
|
||||
// Then manage the content on the HashMap entries that weren't taken (mem::take).
|
||||
while let Some(mut map) = maps.pop() {
|
||||
for (key, output) in map.iter_mut() {
|
||||
let mut output = mem::take(output);
|
||||
for (key, bbbul) in map.iter_mut() {
|
||||
let mut output = DelAddRoaringBitmap::empty();
|
||||
output.append_and_clear_bbbul(bbbul);
|
||||
|
||||
// Make sure we don't try to work with entries already managed by the spilled
|
||||
if !output.is_empty() {
|
||||
if !bbbul.is_empty() {
|
||||
for rhs in maps.iter_mut() {
|
||||
if let Some(new) = rhs.get_mut(key) {
|
||||
output = output.merge(mem::take(new));
|
||||
output.append_and_clear_bbbul(new);
|
||||
}
|
||||
}
|
||||
|
||||
@ -530,6 +590,44 @@ impl<R> PartialOrd for Entry<R> {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DelAddBbbul<'bump, B> {
|
||||
pub del: Option<Bbbul<'bump, B>>,
|
||||
pub add: Option<Bbbul<'bump, B>>,
|
||||
}
|
||||
|
||||
impl<'bump, B: BitPacker> DelAddBbbul<'bump, B> {
|
||||
pub fn insert_del_u32_in(&mut self, n: u32, bump: &'bump Bump) {
|
||||
self.del.get_or_insert_with(|| Bbbul::new_in(bump)).insert(n);
|
||||
}
|
||||
|
||||
pub fn insert_add_u32_in(&mut self, n: u32, bump: &'bump Bump) {
|
||||
self.add.get_or_insert_with(|| Bbbul::new_in(bump)).insert(n);
|
||||
}
|
||||
|
||||
pub fn new_del_u32_in(n: u32, bump: &'bump Bump) -> Self {
|
||||
let mut bbbul = Bbbul::new_in(bump);
|
||||
bbbul.insert(n);
|
||||
DelAddBbbul { del: Some(bbbul), add: None }
|
||||
}
|
||||
|
||||
pub fn new_add_u32_in(n: u32, bump: &'bump Bump) -> Self {
|
||||
let mut bbbul = Bbbul::new_in(bump);
|
||||
bbbul.insert(n);
|
||||
DelAddBbbul { del: None, add: Some(bbbul) }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FrozenDelAddBbbul<'bump, B> {
|
||||
pub del: Option<FrozenBbbul<'bump, B>>,
|
||||
pub add: Option<FrozenBbbul<'bump, B>>,
|
||||
}
|
||||
|
||||
impl<'bump, B> FrozenDelAddBbbul<'bump, B> {
|
||||
fn is_empty(&self) -> bool {
|
||||
self.del.is_none() && self.add.is_none()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct DelAddRoaringBitmap {
|
||||
pub del: Option<RoaringBitmap>,
|
||||
@ -578,6 +676,26 @@ impl DelAddRoaringBitmap {
|
||||
DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) }
|
||||
}
|
||||
|
||||
pub fn append_and_clear_bbbul<B: BitPacker>(&mut self, bbbul: &mut FrozenDelAddBbbul<'_, B>) {
|
||||
let FrozenDelAddBbbul { del, add } = bbbul;
|
||||
|
||||
if let Some(ref mut bbbul) = del.take() {
|
||||
let del = self.del.get_or_insert_with(RoaringBitmap::new);
|
||||
let mut iter = bbbul.iter_and_clear();
|
||||
while let Some(block) = iter.next_block() {
|
||||
del.append(block.iter().copied());
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref mut bbbul) = add.take() {
|
||||
let add = self.add.get_or_insert_with(RoaringBitmap::new);
|
||||
let mut iter = bbbul.iter_and_clear();
|
||||
while let Some(block) = iter.next_block() {
|
||||
add.append(block.iter().copied());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn merge(self, rhs: DelAddRoaringBitmap) -> DelAddRoaringBitmap {
|
||||
let DelAddRoaringBitmap { del, add } = self;
|
||||
let DelAddRoaringBitmap { del: ndel, add: nadd } = rhs;
|
||||
|
Loading…
x
Reference in New Issue
Block a user