Introduce option flags to toggle the indexing engine

This commit is contained in:
Clément Renault 2020-08-05 12:10:41 +02:00
parent bd4b18541c
commit 4873abe145
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -52,6 +52,14 @@ struct Opt {
#[structopt(short, long)] #[structopt(short, long)]
jobs: Option<usize>, jobs: Option<usize>,
/// MTBL max number of chunks in bytes.
#[structopt(long)]
max_nb_chunks: Option<usize>,
/// MTBL max memory in bytes.
#[structopt(long)]
max_memory: Option<usize>,
/// Verbose mode (-v, -vv, -vvv, etc.) /// Verbose mode (-v, -vv, -vvv, etc.)
#[structopt(short, long, parse(from_occurrences))] #[structopt(short, long, parse(from_occurrences))]
verbose: usize, verbose: usize,
@ -60,6 +68,10 @@ struct Opt {
csv_file: Option<PathBuf>, csv_file: Option<PathBuf>,
} }
fn lmdb_key_valid_size(key: &[u8]) -> bool {
!key.is_empty() && key.len() <= LMDB_MAX_KEY_LENGTH
}
type MergeFn = fn(&[u8], &[Vec<u8>]) -> Result<Vec<u8>, ()>; type MergeFn = fn(&[u8], &[Vec<u8>]) -> Result<Vec<u8>, ()>;
struct Store { struct Store {
@ -69,15 +81,23 @@ struct Store {
} }
impl Store { impl Store {
fn new() -> Store { fn new(max_nb_chunks: Option<usize>, max_memory: Option<usize>) -> Store {
let sorter = Sorter::builder(merge as MergeFn) let mut builder = Sorter::builder(merge as MergeFn);
.chunk_compression_type(CompressionType::Snappy)
.build(); builder.chunk_compression_type(CompressionType::Snappy);
if let Some(nb_chunks) = max_nb_chunks {
builder.max_nb_chunks(nb_chunks);
}
if let Some(memory) = max_memory {
builder.max_memory(memory);
}
Store { Store {
word_positions: ArcCache::new(65_535), word_positions: ArcCache::new(65_535),
word_position_docids: ArcCache::new(65_535), word_position_docids: ArcCache::new(65_535),
sorter, sorter: builder.build(),
} }
} }
@ -126,8 +146,10 @@ impl Store {
buffer.clear(); buffer.clear();
positions.serialize_into(&mut buffer)?; positions.serialize_into(&mut buffer)?;
// that we write under the generated key into MTBL // that we write under the generated key into MTBL
if lmdb_key_valid_size(&key) {
sorter.insert(&key, &buffer)?; sorter.insert(&key, &buffer)?;
} }
}
Ok(()) Ok(())
} }
@ -149,7 +171,9 @@ impl Store {
buffer.clear(); buffer.clear();
ids.serialize_into(&mut buffer)?; ids.serialize_into(&mut buffer)?;
// that we write under the generated key into MTBL // that we write under the generated key into MTBL
if lmdb_key_valid_size(&key) {
sorter.insert(&key, &buffer)?; sorter.insert(&key, &buffer)?;
}
// And cleanup the position afterward // And cleanup the position afterward
key.truncate(key.len() - position_bytes.len()); key.truncate(key.len() - position_bytes.len());
} }
@ -288,11 +312,13 @@ fn index_csv(
mut rdr: csv::Reader<File>, mut rdr: csv::Reader<File>,
thread_index: usize, thread_index: usize,
num_threads: usize, num_threads: usize,
max_nb_chunks: Option<usize>,
max_memory: Option<usize>,
) -> anyhow::Result<Reader<Mmap>> ) -> anyhow::Result<Reader<Mmap>>
{ {
debug!("{:?}: Indexing into an Indexed...", thread_index); debug!("{:?}: Indexing into an Indexed...", thread_index);
let mut store = Store::new(); let mut store = Store::new(max_nb_chunks, max_memory);
// Write the headers into a Vec of bytes and then into the store. // Write the headers into a Vec of bytes and then into the store.
let headers = rdr.headers()?; let headers = rdr.headers()?;
@ -310,21 +336,18 @@ fn index_csv(
if document_id % num_threads != thread_index { continue } if document_id % num_threads != thread_index { continue }
let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; let document_id = DocumentId::try_from(document_id).context("generated id is too big")?;
if document_id % (ONE_MILLION as u32) == 0 { if document_id % (ONE_MILLION as u32) == 0 {
debug!("We have seen {}m documents so far.", document_id / ONE_MILLION as u32); debug!("We have seen {}m documents so far.", document_id / ONE_MILLION as u32);
} }
for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) {
for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) {
if !word.is_empty() && word.len() < LMDB_MAX_KEY_LENGTH {
let word = word.cow_to_lowercase(); let word = word.cow_to_lowercase();
let position = (attr * MAX_POSITION + pos) as u32; let position = (attr * MAX_POSITION + pos) as u32;
store.insert_word_position(&word, position)?; store.insert_word_position(&word, position)?;
store.insert_word_position_docid(&word, position, document_id)?; store.insert_word_position_docid(&word, position, document_id)?;
} }
} }
}
// We write the document in the database. // We write the document in the database.
let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new());
@ -407,6 +430,9 @@ fn main() -> anyhow::Result<()> {
let num_threads = rayon::current_num_threads(); let num_threads = rayon::current_num_threads();
let max_nb_chunks = opt.max_nb_chunks;
let max_memory = opt.max_memory;
// We duplicate the file # jobs times. // We duplicate the file # jobs times.
let file = opt.csv_file.unwrap(); let file = opt.csv_file.unwrap();
let csv_readers: Vec<_> = (0..num_threads).map(|_| csv::Reader::from_path(&file)).collect::<Result<_, _>>()?; let csv_readers: Vec<_> = (0..num_threads).map(|_| csv::Reader::from_path(&file)).collect::<Result<_, _>>()?;
@ -414,7 +440,7 @@ fn main() -> anyhow::Result<()> {
let stores: Vec<_> = csv_readers let stores: Vec<_> = csv_readers
.into_par_iter() .into_par_iter()
.enumerate() .enumerate()
.map(|(i, rdr)| index_csv(rdr, i, num_threads)) .map(|(i, rdr)| index_csv(rdr, i, num_threads, max_nb_chunks, max_memory))
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
debug!("We are writing into LMDB..."); debug!("We are writing into LMDB...");