Introduce the Transform type into the indexing system

This commit is contained in:
Clément Renault 2020-10-24 16:23:08 +02:00
parent b44b04d25b
commit a7a4984175
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
8 changed files with 173 additions and 251 deletions

View file

@ -1,26 +1,29 @@
use std::borrow::Cow;
use std::fs::File;
use std::io::{self, Read, Seek, SeekFrom};
use std::io::{self, Seek, SeekFrom};
use std::sync::mpsc::sync_channel;
use std::time::Instant;
use anyhow::Context;
use bstr::ByteSlice as _;
use flate2::read::GzDecoder;
use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType};
use heed::types::ByteSlice;
use log::{debug, info, error};
use rayon::prelude::*;
use roaring::RoaringBitmap;
use structopt::StructOpt;
use tempfile::tempfile;
use crate::Index;
use crate::FieldsIdsMap;
use crate::index::Index;
use self::store::Store;
use self::merge_function::{
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
docid_word_positions_merge, documents_merge,
};
pub use self::transform::{Transform, TransformOutput};
mod merge_function;
mod store;
mod transform;
@ -30,11 +33,11 @@ pub struct IndexerOpt {
/// The amount of documents to skip before printing
/// a log regarding the indexing advancement.
#[structopt(long, default_value = "1000000")] // 1m
log_every_n: usize,
pub log_every_n: usize,
/// MTBL max number of chunks in bytes.
#[structopt(long)]
max_nb_chunks: Option<usize>,
pub max_nb_chunks: Option<usize>,
/// The maximum amount of memory to use for the MTBL buffer. It is recommended
/// to use something like 80%-90% of the available memory.
@ -42,23 +45,23 @@ pub struct IndexerOpt {
/// It is automatically split by the number of jobs e.g. if you use 7 jobs
/// and 7 GB of max memory, each thread will use a maximum of 1 GB.
#[structopt(long, default_value = "7516192768")] // 7 GB
max_memory: usize,
pub max_memory: usize,
/// Size of the linked hash map cache when indexing.
/// The bigger it is, the faster the indexing is but the more memory it takes.
#[structopt(long, default_value = "500")]
linked_hash_map_size: usize,
pub linked_hash_map_size: usize,
/// The name of the compression algorithm to use when compressing intermediate
/// chunks during indexing documents.
///
/// Choosing a fast algorithm will make the indexing faster but may consume more memory.
#[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
chunk_compression_type: CompressionType,
pub chunk_compression_type: CompressionType,
/// The level of compression of the chosen algorithm.
#[structopt(long, requires = "chunk-compression-type")]
chunk_compression_level: Option<u32>,
pub chunk_compression_level: Option<u32>,
/// The number of bytes to remove from the begining of the chunks while reading/sorting
/// or merging them.
@ -66,15 +69,15 @@ pub struct IndexerOpt {
/// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`,
/// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set.
#[structopt(long, default_value = "4294967296")] // 4 GB
chunk_fusing_shrink_size: u64,
pub chunk_fusing_shrink_size: u64,
/// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2.
#[structopt(long)]
enable_chunk_fusing: bool,
pub enable_chunk_fusing: bool,
/// Number of parallel jobs for indexing, defaults to # of CPUs.
#[structopt(long)]
indexing_jobs: Option<usize>,
pub indexing_jobs: Option<usize>,
}
#[derive(Debug, Copy, Clone)]
@ -210,51 +213,48 @@ fn write_into_lmdb_database(
Ok(())
}
fn csv_bytes_readers<'a>(
content: &'a [u8],
gzipped: bool,
count: usize,
) -> Vec<csv::Reader<Box<dyn Read + Send + 'a>>>
{
let mut readers = Vec::new();
for _ in 0..count {
let content = if gzipped {
Box::new(GzDecoder::new(content)) as Box<dyn Read + Send>
} else {
Box::new(content) as Box<dyn Read + Send>
};
let reader = csv::Reader::from_reader(content);
readers.push(reader);
}
readers
}
pub fn run<'a, F>(
pub fn run<F>(
env: &heed::Env,
index: &Index,
opt: &IndexerOpt,
content: &'a [u8],
gzipped: bool,
fields_ids_map: FieldsIdsMap,
users_ids_documents_ids: fst::Map<Vec<u8>>,
new_documents_ids: RoaringBitmap,
documents: grenad::Reader<&[u8]>,
documents_count: u32,
progress_callback: F,
) -> anyhow::Result<()>
where F: Fn(u32) + Sync + Send,
where F: Fn(u32, u32) + Sync + Send,
{
let jobs = opt.indexing_jobs.unwrap_or(0);
let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?;
pool.install(|| run_intern(env, index, opt, content, gzipped, progress_callback))
pool.install(|| {
run_intern(
env,
index,
opt,
fields_ids_map,
users_ids_documents_ids,
new_documents_ids,
documents,
documents_count,
progress_callback,
)
})
}
fn run_intern<'a, F>(
fn run_intern<F>(
env: &heed::Env,
index: &Index,
opt: &IndexerOpt,
content: &'a [u8],
gzipped: bool,
fields_ids_map: FieldsIdsMap,
users_ids_documents_ids: fst::Map<Vec<u8>>,
new_documents_ids: RoaringBitmap,
documents: grenad::Reader<&[u8]>,
documents_count: u32,
progress_callback: F,
) -> anyhow::Result<()>
where F: Fn(u32) + Sync + Send,
where F: Fn(u32, u32) + Sync + Send,
{
let before_indexing = Instant::now();
let num_threads = rayon::current_num_threads();
@ -271,14 +271,9 @@ where F: Fn(u32) + Sync + Send,
None
};
let rtxn = env.read_txn()?;
let number_of_documents = index.number_of_documents(&rtxn)?;
drop(rtxn);
let readers = csv_bytes_readers(content, gzipped, num_threads)
.into_par_iter()
let readers = rayon::iter::repeatn(documents, num_threads)
.enumerate()
.map(|(i, rdr)| {
.map(|(i, documents)| {
let store = Store::new(
linked_hash_map_size,
max_nb_chunks,
@ -287,8 +282,7 @@ where F: Fn(u32) + Sync + Send,
chunk_compression_level,
chunk_fusing_shrink_size,
)?;
let base_document_id = number_of_documents;
store.index_csv(rdr, base_document_id, i, num_threads, log_every_n, &progress_callback)
store.index(documents, documents_count, i, num_threads, log_every_n, &progress_callback)
})
.collect::<Result<Vec<_>, _>>()?;
@ -341,10 +335,32 @@ where F: Fn(u32) + Sync + Send,
});
});
// We create the write transaction of this update.
// TODO we must get this transaction as an argument to be able
// to first delete the replaced documents for example.
let mut wtxn = env.write_txn()?;
let contains_documents = number_of_documents != 0;
let write_method = if contains_documents { WriteMethod::GetMergePut } else { WriteMethod::Append };
let contains_documents = index.documents_ids(&wtxn)?.map_or(false, |docids| !docids.is_empty());
let write_method = if contains_documents {
WriteMethod::GetMergePut
} else {
WriteMethod::Append
};
// We write the fields ids map into the main database
index.put_fields_ids_map(&mut wtxn, &fields_ids_map)?;
// We write the users_ids_documents_ids into the main database.
index.put_users_ids_documents_ids(&mut wtxn, &users_ids_documents_ids)?;
// We merge the new documents ids with the existing ones.
match index.documents_ids(&wtxn)? {
Some(mut documents_ids) => {
documents_ids.union_with(&new_documents_ids);
index.put_documents_ids(&mut wtxn, &documents_ids)?;
},
None => index.put_documents_ids(&mut wtxn, &new_documents_ids)?,
}
debug!("Writing the docid word positions into LMDB on disk...");
merge_into_lmdb_database(