mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-02-23 02:38:28 +01:00
Chunk the keys to process them in parallel
This commit is contained in:
parent
803a699b15
commit
081f614a5e
@ -1,6 +1,10 @@
|
|||||||
use std::fs::{read_dir, read_to_string, remove_file, File};
|
use std::fs::{read_dir, read_to_string, remove_file, File};
|
||||||
|
use std::hint::black_box;
|
||||||
use std::io::{BufWriter, Write as _};
|
use std::io::{BufWriter, Write as _};
|
||||||
|
use std::ops::Bound;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
use std::thread;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
@ -640,28 +644,70 @@ fn hair_dryer(
|
|||||||
for part in index_parts {
|
for part in index_parts {
|
||||||
match part {
|
match part {
|
||||||
IndexPart::Arroy => {
|
IndexPart::Arroy => {
|
||||||
let mut count = 0;
|
// It would be better if it is a command parameter
|
||||||
let total = index.vector_arroy.len(&rtxn)?;
|
let total_threads = thread::available_parallelism().unwrap().get() * 10;
|
||||||
eprintln!("Hair drying arroy for {uid}...");
|
eprintln!("Hair drying arroy for {uid} using {total_threads} threads...");
|
||||||
for (i, result) in index
|
|
||||||
.vector_arroy
|
let database = index.vector_arroy.remap_types::<Bytes, Bytes>();
|
||||||
.remap_types::<Bytes, Bytes>()
|
let num_keys = database.len(&rtxn)? as usize;
|
||||||
.iter(&rtxn)?
|
let first_entry = database.iter(&rtxn)?.next().transpose()?;
|
||||||
.enumerate()
|
let last_entry = database.rev_iter(&rtxn)?.next().transpose()?;
|
||||||
{
|
let keys_by_thread = num_keys / total_threads;
|
||||||
|
|
||||||
|
let Some(((first_key, _), (last_key, _))) = first_entry.zip(last_entry)
|
||||||
|
else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let first_key_num = first_key.try_into().map(u64::from_be_bytes).unwrap();
|
||||||
|
let last_key_num = last_key.try_into().map(u64::from_be_bytes).unwrap();
|
||||||
|
|
||||||
|
eprintln!("between {first_key_num:x} and {last_key_num:x}");
|
||||||
|
eprintln!("Iterating over {keys_by_thread} entries by thread...");
|
||||||
|
|
||||||
|
let progress = AtomicUsize::new(0);
|
||||||
|
let count = thread::scope(|s| -> anyhow::Result<usize> {
|
||||||
|
let mut handles = Vec::new();
|
||||||
|
|
||||||
|
for tid in 0..total_threads {
|
||||||
|
let index = &index;
|
||||||
|
let progress = &progress;
|
||||||
|
let handle = s.spawn(move || -> anyhow::Result<usize> {
|
||||||
|
let rtxn = index.read_txn()?;
|
||||||
|
let start = first_key_num + (keys_by_thread * tid) as u64;
|
||||||
|
let start_bytes = start.to_be_bytes();
|
||||||
|
let range = (Bound::Included(&start_bytes[..]), Bound::Unbounded);
|
||||||
|
|
||||||
|
let mut count: usize = 0;
|
||||||
|
for result in database.range(&rtxn, &range)?.take(keys_by_thread) {
|
||||||
let (key, value) = result?;
|
let (key, value) = result?;
|
||||||
|
|
||||||
// All of this just to avoid compiler optimizations 🤞
|
// All of this just to avoid compiler optimizations 🤞
|
||||||
// We must read all the bytes to make the pages hot in cache.
|
// We must read all the bytes to make the pages hot in cache.
|
||||||
// <https://doc.rust-lang.org/std/hint/fn.black_box.html>
|
// <https://doc.rust-lang.org/std/hint/fn.black_box.html>
|
||||||
count += std::hint::black_box(key.iter().fold(0, |acc, _| acc + 1));
|
count += black_box(key.iter().fold(0, |acc, _| acc + 1));
|
||||||
count += std::hint::black_box(value.iter().fold(0, |acc, _| acc + 1));
|
count += black_box(value.iter().fold(0, |acc, _| acc + 1));
|
||||||
|
|
||||||
if i % 10_000 == 0 {
|
let current_progress = progress.fetch_add(1, Ordering::Relaxed);
|
||||||
let perc = (i as f64) / (total as f64) * 100.0;
|
if current_progress % 10_000 == 0 {
|
||||||
eprintln!("Visited {i}/{total} ({perc:.2}%) keys")
|
let perc = (current_progress as f64) / (num_keys as f64) * 100.0;
|
||||||
|
eprintln!("Visited {current_progress}/{num_keys} ({perc:.2}%) keys");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(count)
|
||||||
|
});
|
||||||
|
|
||||||
|
handles.push(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut count = 0usize;
|
||||||
|
for handle in handles {
|
||||||
|
count += handle.join().unwrap()?;
|
||||||
|
}
|
||||||
|
Ok(count)
|
||||||
|
})?;
|
||||||
|
|
||||||
eprintln!("Done hair drying a total of at least {count} bytes.");
|
eprintln!("Done hair drying a total of at least {count} bytes.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user