From 437940d0535471ea86b11f8101eadc4cb5226d83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 24 Oct 2024 15:26:42 +0200 Subject: [PATCH] Describe the multi-threaded cache merging --- milli/src/update/new/extract/cache.rs | 56 +++++++++++++++++++-------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 470aa6867..0f643e764 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -29,25 +29,47 @@ pub struct CboCachedSorter<'extractor> { // # How the Merge Algorithm works // -// - Collect all hashmaps to the main thread -// - Iterator over all the hashmaps in the different threads -// - Each thread must take care of its own keys (regarding a hash number) -// - Also read the spilled content which are inside -// - Each thread must populate a local hashmap with the entries -// - Every thread send the merged content to the main writing thread +// Each extractor create #Threads caches and balances the entries +// based on the hash of the keys. To do that we can use the +// hashbrown::hash_map::RawEntryBuilderMut::from_key_hashed_nocheck. +// This way we can compute the hash on our own, decide on the cache to +// target, and insert it into the right HashMap. // -// ## Next Step +// #Thread -> caches +// t1 -> [t1c1, t1c2, t1c3] +// t2 -> [t2c1, t2c2, t2c3] +// t3 -> [t3c1, t3c2, t3c3] // -// - Define the size of the buckets in advance to make sure everything fits in memory. -// ``` -// let total_buckets = 32; -// (0..total_buckets).par_iter().for_each(|n| { -// let hash = todo!(); -// if hash % total_bucket == n { -// // take care of this key -// } -// }); -// ``` +// When the extractors are done filling the caches, we want to merge +// the content of all the caches. We do a transpose and each thread is +// assigned the associated cache. By doing that we know that every key +// is put in a known cache and will collide with keys in the other +// caches of the other threads. +// +// #Thread -> caches +// t1 -> [t1c1, t2c1, t3c1] +// t2 -> [t1c2, t2c2, t3c2] +// t3 -> [t1c3, t2c3, t3c3] +// +// When we encountered a miss in the other caches we must still try +// to find it in the spilled entries. This is the reason why we use +// a grenad sorter/reader so that we can seek "efficiently" for a key. +// +// ## Memory Control +// +// We can detect that there are no more memory available when the +// bump allocator reaches a threshold. When this is the case we +// freeze the cache. There is one bump allocator by thread and the +// memory must be well balanced as we manage one type of extraction +// at a time with well-balanced documents. +// +// It means that the unknown new keys added to the +// cache are directly spilled to disk: basically a key followed by a +// del/add bitmap. For the known keys we can keep modifying them in +// the materialized version in the cache: update the del/add bitmaps. +// +// For now we can use a grenad sorter for spilling even thought I think +// it's not the most efficient way (too many files open, sorting entries). impl<'extractor> CboCachedSorter<'extractor> { /// TODO may add the capacity