From 944df52e2ad0c4ddf5f13c830e9146286602a374 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 21 Sep 2020 14:59:48 +0200 Subject: [PATCH] Simplify the indexer main loop --- src/bin/indexer.rs | 212 ++++++++++++++++++++++----------------------- 1 file changed, 103 insertions(+), 109 deletions(-) diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index e2f6c012c..3f6e63477 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -4,7 +4,7 @@ use std::fs::File; use std::io::{self, Read, Write}; use std::iter::FromIterator; use std::path::PathBuf; -use std::thread; +use std::{iter, thread}; use std::time::Instant; use anyhow::Context; @@ -129,7 +129,7 @@ struct Store { } impl Store { - fn new( + pub fn new( arc_cache_size: usize, max_nb_chunks: Option, max_memory: Option, @@ -164,7 +164,7 @@ impl Store { } // Save the documents ids under the position and word we have seen it. - pub fn insert_word_docid(&mut self, word: &str, id: DocumentId) -> anyhow::Result<()> { + fn insert_word_docid(&mut self, word: &str, id: DocumentId) -> anyhow::Result<()> { let word_vec = SmallVec32::from(word.as_bytes()); let ids = RoaringBitmap::from_iter(Some(id)); let (_, lrus) = self.word_docids.insert(word_vec, ids, |old, new| old.union_with(&new)); @@ -172,13 +172,13 @@ impl Store { Ok(()) } - pub fn write_headers(&mut self, headers: &StringRecord) -> anyhow::Result<()> { + fn write_headers(&mut self, headers: &StringRecord) -> anyhow::Result<()> { let headers = CsvStringRecordCodec::bytes_encode(headers) .with_context(|| format!("could not encode csv record"))?; Ok(self.sorter.insert(HEADERS_KEY, headers)?) } - pub fn write_document( + fn write_document( &mut self, id: DocumentId, iter: impl IntoIterator, @@ -248,7 +248,57 @@ impl Store { Ok(()) } - pub fn finish(mut self) -> anyhow::Result<(Reader, Reader)> { + pub fn index_csv( + mut self, + mut rdr: csv::Reader>, + thread_index: usize, + num_threads: usize, + ) -> anyhow::Result<(Reader, Reader)> + { + debug!("{:?}: Indexing in a Store...", thread_index); + + // Write the headers into the store. + let headers = rdr.headers()?; + self.write_headers(&headers)?; + + let mut before = Instant::now(); + let mut document_id: usize = 0; + let mut document = csv::StringRecord::new(); + let mut word_positions = HashMap::new(); + while rdr.read_record(&mut document)? { + + // We skip documents that must not be indexed by this thread. + if document_id % num_threads == thread_index { + if document_id % ONE_MILLION == 0 { + let count = document_id / ONE_MILLION; + info!("We have seen {}m documents so far ({:.02?}).", count, before.elapsed()); + before = Instant::now(); + } + + let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; + for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { + for (pos, (_, token)) in simple_tokenizer(&content).filter(only_words).enumerate().take(MAX_POSITION) { + let word = token.to_lowercase(); + let position = (attr * MAX_POSITION + pos) as u32; + self.insert_word_docid(&word, document_id)?; + word_positions.entry(word).or_insert_with(RoaringBitmap::new).insert(position); + } + } + + // We write the document in the documents store. + self.write_document(document_id, word_positions.drain(), &document)?; + } + + // Compute the document id of the next document. + document_id = document_id + 1; + } + + let (reader, docs_reader) = self.finish()?; + debug!("{:?}: Store created!", thread_index); + Ok((reader, docs_reader)) + } + + fn finish(mut self) -> anyhow::Result<(Reader, Reader)> { Self::write_word_docids(&mut self.sorter, self.word_docids)?; Self::write_documents_ids(&mut self.sorter, self.documents_ids)?; @@ -375,66 +425,57 @@ where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> Ok(()) } -fn index_csv( - mut rdr: csv::Reader>, - thread_index: usize, +/// Returns the list of CSV sources that the indexer must read. +/// +/// There is `num_threads` sources. If the file is not specified, the standard input is used. +fn csv_readers( + csv_file_path: Option, num_threads: usize, - arc_cache_size: usize, - max_nb_chunks: Option, - max_memory: Option, - chunk_compression_type: CompressionType, - chunk_compression_level: Option, -) -> anyhow::Result<(Reader, Reader)> +) -> anyhow::Result>>> { - debug!("{:?}: Indexing into a Store...", thread_index); - - let mut store = Store::new( - arc_cache_size, - max_nb_chunks, - max_memory, - chunk_compression_type, - chunk_compression_level, - ); - - // Write the headers into a Vec of bytes and then into the store. - let headers = rdr.headers()?; - store.write_headers(&headers)?; - - let mut before = Instant::now(); - let mut document_id: usize = 0; - let mut document = csv::StringRecord::new(); - let mut word_positions = HashMap::new(); - while rdr.read_record(&mut document)? { - - // We skip documents that must not be indexed by this thread. - if document_id % num_threads == thread_index { - if document_id % ONE_MILLION == 0 { - info!("We have seen {}m documents so far ({:.02?}).", - document_id / ONE_MILLION, before.elapsed()); - before = Instant::now(); + match csv_file_path { + Some(file_path) => { + // We open the file # jobs times. + iter::repeat_with(|| { + let file = File::open(&file_path) + .with_context(|| format!("Failed to read CSV file {}", file_path.display()))?; + // if the file extension is "gz" or "gzip" we can decode and read it. + let r = if file_path.extension().map_or(false, |e| e == "gz" || e == "gzip") { + Box::new(GzDecoder::new(file)) as Box + } else { + Box::new(file) as Box + }; + Ok(csv::Reader::from_reader(r)) as anyhow::Result<_> + }) + .take(num_threads) + .collect() + }, + None => { + let mut csv_readers = Vec::new(); + let mut writers = Vec::new(); + for (r, w) in iter::repeat_with(ringtail::io::pipe).take(num_threads) { + let r = Box::new(r) as Box; + csv_readers.push(csv::Reader::from_reader(r)); + writers.push(w); } - let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; - for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { - for (pos, (_, token)) in simple_tokenizer(&content).filter(only_words).enumerate().take(MAX_POSITION) { - let word = token.to_lowercase(); - let position = (attr * MAX_POSITION + pos) as u32; - store.insert_word_docid(&word, document_id)?; - word_positions.entry(word).or_insert_with(RoaringBitmap::new).insert(position); + thread::spawn(move || { + let stdin = std::io::stdin(); + let mut stdin = stdin.lock(); + let mut buffer = [0u8; 4096]; + loop { + match stdin.read(&mut buffer)? { + 0 => return Ok(()) as io::Result<()>, + size => for w in &mut writers { + w.write_all(&buffer[..size])?; + } + } } - } + }); - // We write the document in the database. - store.write_document(document_id, word_positions.drain(), &document)?; - } - - // Compute the document id of the the next document. - document_id = document_id + 1; + Ok(csv_readers) + }, } - - let (reader, docs_reader) = store.finish()?; - debug!("{:?}: Store created!", thread_index); - Ok((reader, docs_reader)) } fn main() -> anyhow::Result<()> { @@ -466,69 +507,22 @@ fn main() -> anyhow::Result<()> { let chunk_compression_type = compression_type_from_str(&opt.indexer.chunk_compression_type); let chunk_compression_level = opt.indexer.chunk_compression_level; - let csv_readers = match opt.csv_file { - Some(file_path) => { - // We open the file # jobs times. - (0..num_threads) - .map(|_| { - let file = File::open(&file_path)?; - // if the file extension is "gz" or "gzip" we can decode and read it. - let r = if file_path.extension().map_or(false, |ext| ext == "gz" || ext == "gzip") { - Box::new(GzDecoder::new(file)) as Box - } else { - Box::new(file) as Box - }; - Ok(csv::Reader::from_reader(r)) as io::Result<_> - }) - .collect::, _>>()? - }, - None => { - let mut csv_readers = Vec::new(); - let mut writers = Vec::new(); - for (r, w) in (0..num_threads).map(|_| ringtail::io::pipe()) { - let r = Box::new(r) as Box; - csv_readers.push(csv::Reader::from_reader(r)); - writers.push(w); - } - - thread::spawn(move || { - let stdin = std::io::stdin(); - let mut stdin = stdin.lock(); - let mut buffer = [0u8; 4096]; - loop { - match stdin.read(&mut buffer)? { - 0 => return Ok(()) as io::Result<()>, - size => for w in &mut writers { - w.write_all(&buffer[..size])?; - } - } - } - }); - - csv_readers - }, - }; - - let readers = csv_readers + let readers = csv_readers(opt.csv_file, num_threads)? .into_par_iter() .enumerate() .map(|(i, rdr)| { - index_csv( - rdr, - i, - num_threads, + Store::new( arc_cache_size, max_nb_chunks, max_memory, chunk_compression_type, chunk_compression_level, - ) + ).index_csv(rdr, i, num_threads) }) .collect::, _>>()?; let mut stores = Vec::with_capacity(readers.len()); let mut docs_stores = Vec::with_capacity(readers.len()); - readers.into_iter().for_each(|(s, d)| { stores.push(s); docs_stores.push(d);