mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-06-14 00:29:50 +02:00
411 lines
14 KiB
Rust
411 lines
14 KiB
Rust
use std::borrow::Cow;
|
|
use std::fs::File;
|
|
use std::io::{self, Seek, SeekFrom};
|
|
use std::sync::mpsc::sync_channel;
|
|
use std::time::Instant;
|
|
|
|
use anyhow::Context;
|
|
use bstr::ByteSlice as _;
|
|
use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType};
|
|
use heed::types::ByteSlice;
|
|
use log::{debug, info, error};
|
|
use rayon::prelude::*;
|
|
use roaring::RoaringBitmap;
|
|
use structopt::StructOpt;
|
|
use tempfile::tempfile;
|
|
|
|
use crate::FieldsIdsMap;
|
|
use crate::index::Index;
|
|
use self::store::Store;
|
|
use self::merge_function::{
|
|
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
|
|
docid_word_positions_merge, documents_merge,
|
|
};
|
|
|
|
pub use self::transform::{Transform, TransformOutput};
|
|
|
|
mod merge_function;
|
|
mod store;
|
|
mod transform;
|
|
|
|
#[derive(Debug, Clone, StructOpt)]
|
|
pub struct IndexerOpt {
|
|
/// The amount of documents to skip before printing
|
|
/// a log regarding the indexing advancement.
|
|
#[structopt(long, default_value = "1000000")] // 1m
|
|
pub log_every_n: usize,
|
|
|
|
/// MTBL max number of chunks in bytes.
|
|
#[structopt(long)]
|
|
pub max_nb_chunks: Option<usize>,
|
|
|
|
/// The maximum amount of memory to use for the MTBL buffer. It is recommended
|
|
/// to use something like 80%-90% of the available memory.
|
|
///
|
|
/// It is automatically split by the number of jobs e.g. if you use 7 jobs
|
|
/// and 7 GB of max memory, each thread will use a maximum of 1 GB.
|
|
#[structopt(long, default_value = "7516192768")] // 7 GB
|
|
pub max_memory: usize,
|
|
|
|
/// Size of the linked hash map cache when indexing.
|
|
/// The bigger it is, the faster the indexing is but the more memory it takes.
|
|
#[structopt(long, default_value = "500")]
|
|
pub linked_hash_map_size: usize,
|
|
|
|
/// The name of the compression algorithm to use when compressing intermediate
|
|
/// chunks during indexing documents.
|
|
///
|
|
/// Choosing a fast algorithm will make the indexing faster but may consume more memory.
|
|
#[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
|
|
pub chunk_compression_type: CompressionType,
|
|
|
|
/// The level of compression of the chosen algorithm.
|
|
#[structopt(long, requires = "chunk-compression-type")]
|
|
pub chunk_compression_level: Option<u32>,
|
|
|
|
/// The number of bytes to remove from the begining of the chunks while reading/sorting
|
|
/// or merging them.
|
|
///
|
|
/// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`,
|
|
/// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set.
|
|
#[structopt(long, default_value = "4294967296")] // 4 GB
|
|
pub chunk_fusing_shrink_size: u64,
|
|
|
|
/// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2.
|
|
#[structopt(long)]
|
|
pub enable_chunk_fusing: bool,
|
|
|
|
/// Number of parallel jobs for indexing, defaults to # of CPUs.
|
|
#[structopt(long)]
|
|
pub indexing_jobs: Option<usize>,
|
|
}
|
|
|
|
#[derive(Debug, Copy, Clone)]
|
|
enum WriteMethod {
|
|
Append,
|
|
GetMergePut,
|
|
}
|
|
|
|
type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result<Vec<u8>>;
|
|
|
|
fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> {
|
|
let mut builder = Writer::builder();
|
|
builder.compression_type(typ);
|
|
if let Some(level) = level {
|
|
builder.compression_level(level);
|
|
}
|
|
builder.build(file)
|
|
}
|
|
|
|
fn create_sorter(
|
|
merge: MergeFn,
|
|
chunk_compression_type: CompressionType,
|
|
chunk_compression_level: Option<u32>,
|
|
chunk_fusing_shrink_size: Option<u64>,
|
|
max_nb_chunks: Option<usize>,
|
|
max_memory: Option<usize>,
|
|
) -> Sorter<MergeFn>
|
|
{
|
|
let mut builder = Sorter::builder(merge);
|
|
if let Some(shrink_size) = chunk_fusing_shrink_size {
|
|
builder.file_fusing_shrink_size(shrink_size);
|
|
}
|
|
builder.chunk_compression_type(chunk_compression_type);
|
|
if let Some(level) = chunk_compression_level {
|
|
builder.chunk_compression_level(level);
|
|
}
|
|
if let Some(nb_chunks) = max_nb_chunks {
|
|
builder.max_nb_chunks(nb_chunks);
|
|
}
|
|
if let Some(memory) = max_memory {
|
|
builder.max_memory(memory);
|
|
}
|
|
builder.build()
|
|
}
|
|
|
|
fn writer_into_reader(writer: Writer<File>, shrink_size: Option<u64>) -> anyhow::Result<Reader<FileFuse>> {
|
|
let mut file = writer.into_inner()?;
|
|
file.seek(SeekFrom::Start(0))?;
|
|
let file = if let Some(shrink_size) = shrink_size {
|
|
FileFuse::builder().shrink_size(shrink_size).build(file)
|
|
} else {
|
|
FileFuse::new(file)
|
|
};
|
|
Reader::new(file).map_err(Into::into)
|
|
}
|
|
|
|
fn merge_readers(sources: Vec<Reader<FileFuse>>, merge: MergeFn) -> Merger<FileFuse, MergeFn> {
|
|
let mut builder = Merger::builder(merge);
|
|
builder.extend(sources);
|
|
builder.build()
|
|
}
|
|
|
|
fn merge_into_lmdb_database(
|
|
wtxn: &mut heed::RwTxn,
|
|
database: heed::PolyDatabase,
|
|
sources: Vec<Reader<FileFuse>>,
|
|
merge: MergeFn,
|
|
method: WriteMethod,
|
|
) -> anyhow::Result<()> {
|
|
debug!("Merging {} MTBL stores...", sources.len());
|
|
let before = Instant::now();
|
|
|
|
let merger = merge_readers(sources, merge);
|
|
let mut in_iter = merger.into_merge_iter()?;
|
|
|
|
match method {
|
|
WriteMethod::Append => {
|
|
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
|
|
while let Some((k, v)) = in_iter.next()? {
|
|
out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?;
|
|
}
|
|
},
|
|
WriteMethod::GetMergePut => {
|
|
while let Some((k, v)) = in_iter.next()? {
|
|
match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? {
|
|
Some(old_val) => {
|
|
let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)];
|
|
let val = merge(k, &vals).expect("merge failed");
|
|
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)?
|
|
},
|
|
None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?,
|
|
}
|
|
}
|
|
},
|
|
}
|
|
|
|
debug!("MTBL stores merged in {:.02?}!", before.elapsed());
|
|
Ok(())
|
|
}
|
|
|
|
fn write_into_lmdb_database(
|
|
wtxn: &mut heed::RwTxn,
|
|
database: heed::PolyDatabase,
|
|
mut reader: Reader<FileFuse>,
|
|
merge: MergeFn,
|
|
method: WriteMethod,
|
|
) -> anyhow::Result<()> {
|
|
debug!("Writing MTBL stores...");
|
|
let before = Instant::now();
|
|
|
|
match method {
|
|
WriteMethod::Append => {
|
|
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
|
|
while let Some((k, v)) = reader.next()? {
|
|
out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?;
|
|
}
|
|
},
|
|
WriteMethod::GetMergePut => {
|
|
while let Some((k, v)) = reader.next()? {
|
|
match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? {
|
|
Some(old_val) => {
|
|
let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)];
|
|
let val = merge(k, &vals).expect("merge failed");
|
|
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)?
|
|
},
|
|
None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
debug!("MTBL stores merged in {:.02?}!", before.elapsed());
|
|
Ok(())
|
|
}
|
|
|
|
pub fn run<F>(
|
|
env: &heed::Env,
|
|
index: &Index,
|
|
opt: &IndexerOpt,
|
|
fields_ids_map: FieldsIdsMap,
|
|
users_ids_documents_ids: fst::Map<Vec<u8>>,
|
|
new_documents_ids: RoaringBitmap,
|
|
documents: grenad::Reader<&[u8]>,
|
|
documents_count: u32,
|
|
progress_callback: F,
|
|
) -> anyhow::Result<()>
|
|
where F: Fn(u32, u32) + Sync + Send,
|
|
{
|
|
let jobs = opt.indexing_jobs.unwrap_or(0);
|
|
let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?;
|
|
pool.install(|| {
|
|
run_intern(
|
|
env,
|
|
index,
|
|
opt,
|
|
fields_ids_map,
|
|
users_ids_documents_ids,
|
|
new_documents_ids,
|
|
documents,
|
|
documents_count,
|
|
progress_callback,
|
|
)
|
|
})
|
|
}
|
|
|
|
fn run_intern<F>(
|
|
env: &heed::Env,
|
|
index: &Index,
|
|
opt: &IndexerOpt,
|
|
fields_ids_map: FieldsIdsMap,
|
|
users_ids_documents_ids: fst::Map<Vec<u8>>,
|
|
new_documents_ids: RoaringBitmap,
|
|
documents: grenad::Reader<&[u8]>,
|
|
documents_count: u32,
|
|
progress_callback: F,
|
|
) -> anyhow::Result<()>
|
|
where F: Fn(u32, u32) + Sync + Send,
|
|
{
|
|
let before_indexing = Instant::now();
|
|
let num_threads = rayon::current_num_threads();
|
|
let linked_hash_map_size = opt.linked_hash_map_size;
|
|
let max_nb_chunks = opt.max_nb_chunks;
|
|
let max_memory_by_job = opt.max_memory / num_threads;
|
|
let chunk_compression_type = opt.chunk_compression_type;
|
|
let chunk_compression_level = opt.chunk_compression_level;
|
|
let log_every_n = opt.log_every_n;
|
|
|
|
let chunk_fusing_shrink_size = if opt.enable_chunk_fusing {
|
|
Some(opt.chunk_fusing_shrink_size)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let readers = rayon::iter::repeatn(documents, num_threads)
|
|
.enumerate()
|
|
.map(|(i, documents)| {
|
|
let store = Store::new(
|
|
linked_hash_map_size,
|
|
max_nb_chunks,
|
|
Some(max_memory_by_job),
|
|
chunk_compression_type,
|
|
chunk_compression_level,
|
|
chunk_fusing_shrink_size,
|
|
)?;
|
|
store.index(documents, documents_count, i, num_threads, log_every_n, &progress_callback)
|
|
})
|
|
.collect::<Result<Vec<_>, _>>()?;
|
|
|
|
let mut main_readers = Vec::with_capacity(readers.len());
|
|
let mut word_docids_readers = Vec::with_capacity(readers.len());
|
|
let mut docid_word_positions_readers = Vec::with_capacity(readers.len());
|
|
let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len());
|
|
let mut documents_readers = Vec::with_capacity(readers.len());
|
|
readers.into_iter().for_each(|readers| {
|
|
main_readers.push(readers.main);
|
|
word_docids_readers.push(readers.word_docids);
|
|
docid_word_positions_readers.push(readers.docid_word_positions);
|
|
words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids);
|
|
documents_readers.push(readers.documents);
|
|
});
|
|
|
|
// This is the function that merge the readers
|
|
// by using the given merge function.
|
|
let merge_readers = move |readers, merge| {
|
|
let mut writer = tempfile().and_then(|f| {
|
|
create_writer(chunk_compression_type, chunk_compression_level, f)
|
|
})?;
|
|
let merger = merge_readers(readers, merge);
|
|
merger.write_into(&mut writer)?;
|
|
writer_into_reader(writer, chunk_fusing_shrink_size)
|
|
};
|
|
|
|
// The enum and the channel which is used to transfert
|
|
// the readers merges potentially done on another thread.
|
|
enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids };
|
|
let (sender, receiver) = sync_channel(3);
|
|
|
|
debug!("Merging the main, word docids and words pairs proximity docids in parallel...");
|
|
rayon::spawn(move || {
|
|
vec![
|
|
(DatabaseType::Main, main_readers, main_merge as MergeFn),
|
|
(DatabaseType::WordDocids, word_docids_readers, word_docids_merge),
|
|
(
|
|
DatabaseType::WordsPairsProximitiesDocids,
|
|
words_pairs_proximities_docids_readers,
|
|
words_pairs_proximities_docids_merge,
|
|
),
|
|
]
|
|
.into_par_iter()
|
|
.for_each(|(dbtype, readers, merge)| {
|
|
let result = merge_readers(readers, merge);
|
|
if let Err(e) = sender.send((dbtype, result)) {
|
|
error!("sender error: {}", e);
|
|
}
|
|
});
|
|
});
|
|
|
|
// We create the write transaction of this update.
|
|
// TODO we must get this transaction as an argument to be able
|
|
// to first delete the replaced documents for example.
|
|
let mut wtxn = env.write_txn()?;
|
|
|
|
let mut documents_ids = index.documents_ids(&wtxn)?;
|
|
let contains_documents = !documents_ids.is_empty();
|
|
let write_method = if contains_documents {
|
|
WriteMethod::GetMergePut
|
|
} else {
|
|
WriteMethod::Append
|
|
};
|
|
|
|
// We write the fields ids map into the main database
|
|
index.put_fields_ids_map(&mut wtxn, &fields_ids_map)?;
|
|
|
|
// We write the users_ids_documents_ids into the main database.
|
|
index.put_users_ids_documents_ids(&mut wtxn, &users_ids_documents_ids)?;
|
|
|
|
// We merge the new documents ids with the existing ones.
|
|
documents_ids.union_with(&new_documents_ids);
|
|
index.put_documents_ids(&mut wtxn, &documents_ids)?;
|
|
|
|
debug!("Writing the docid word positions into LMDB on disk...");
|
|
merge_into_lmdb_database(
|
|
&mut wtxn,
|
|
*index.docid_word_positions.as_polymorph(),
|
|
docid_word_positions_readers,
|
|
docid_word_positions_merge,
|
|
write_method
|
|
)?;
|
|
|
|
debug!("Writing the documents into LMDB on disk...");
|
|
merge_into_lmdb_database(
|
|
&mut wtxn,
|
|
*index.documents.as_polymorph(),
|
|
documents_readers,
|
|
documents_merge,
|
|
write_method
|
|
)?;
|
|
|
|
for (db_type, result) in receiver {
|
|
let content = result?;
|
|
match db_type {
|
|
DatabaseType::Main => {
|
|
debug!("Writing the main elements into LMDB on disk...");
|
|
write_into_lmdb_database(&mut wtxn, index.main, content, main_merge, write_method)?;
|
|
},
|
|
DatabaseType::WordDocids => {
|
|
debug!("Writing the words docids into LMDB on disk...");
|
|
let db = *index.word_docids.as_polymorph();
|
|
write_into_lmdb_database(&mut wtxn, db, content, word_docids_merge, write_method)?;
|
|
},
|
|
DatabaseType::WordsPairsProximitiesDocids => {
|
|
debug!("Writing the words pairs proximities docids into LMDB on disk...");
|
|
let db = *index.word_pair_proximity_docids.as_polymorph();
|
|
write_into_lmdb_database(
|
|
&mut wtxn,
|
|
db,
|
|
content,
|
|
words_pairs_proximities_docids_merge,
|
|
write_method,
|
|
)?;
|
|
},
|
|
}
|
|
}
|
|
|
|
wtxn.commit()?;
|
|
|
|
info!("Update processed in {:.02?}", before_indexing.elapsed());
|
|
|
|
Ok(())
|
|
}
|