Introduce an append-only indexing system

This commit is contained in:
Clément Renault 2020-10-20 15:00:58 +02:00
parent a122d3d466
commit 8ed8abb9df
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
4 changed files with 136 additions and 37 deletions

View File

@ -8,7 +8,7 @@ use bstr::ByteSlice as _;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType};
use heed::types::ByteSlice; use heed::types::ByteSlice;
use log::{debug, info}; use log::{debug, info, error};
use rayon::prelude::*; use rayon::prelude::*;
use structopt::StructOpt; use structopt::StructOpt;
use tempfile::tempfile; use tempfile::tempfile;
@ -23,7 +23,7 @@ use self::merge_function::{
mod store; mod store;
mod merge_function; mod merge_function;
#[derive(Debug, StructOpt)] #[derive(Debug, Clone, StructOpt)]
pub struct IndexerOpt { pub struct IndexerOpt {
/// The amount of documents to skip before printing /// The amount of documents to skip before printing
/// a log regarding the indexing advancement. /// a log regarding the indexing advancement.
@ -75,6 +75,12 @@ pub struct IndexerOpt {
indexing_jobs: Option<usize>, indexing_jobs: Option<usize>,
} }
#[derive(Debug, Copy, Clone)]
enum WriteMethod {
Append,
GetMergePut,
}
type MergeFn = fn(&[u8], &[Vec<u8>]) -> Result<Vec<u8>, ()>; type MergeFn = fn(&[u8], &[Vec<u8>]) -> Result<Vec<u8>, ()>;
fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> { fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> {
@ -134,6 +140,7 @@ fn merge_into_lmdb_database(
database: heed::PolyDatabase, database: heed::PolyDatabase,
sources: Vec<Reader<FileFuse>>, sources: Vec<Reader<FileFuse>>,
merge: MergeFn, merge: MergeFn,
method: WriteMethod,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
debug!("Merging {} MTBL stores...", sources.len()); debug!("Merging {} MTBL stores...", sources.len());
let before = Instant::now(); let before = Instant::now();
@ -141,9 +148,26 @@ fn merge_into_lmdb_database(
let merger = merge_readers(sources, merge); let merger = merge_readers(sources, merge);
let mut in_iter = merger.into_merge_iter()?; let mut in_iter = merger.into_merge_iter()?;
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; match method {
while let Some((k, v)) = in_iter.next()? { WriteMethod::Append => {
out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
while let Some((k, v)) = in_iter.next()? {
out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?;
}
},
WriteMethod::GetMergePut => {
while let Some((k, v)) = in_iter.next()? {
match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? {
Some(old_val) => {
// TODO improve the function signature and avoid alocating here!
let vals = vec![old_val.to_vec(), v.to_vec()];
let val = merge(k, &vals).expect("merge failed");
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)?
},
None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?,
}
}
},
} }
debug!("MTBL stores merged in {:.02?}!", before.elapsed()); debug!("MTBL stores merged in {:.02?}!", before.elapsed());
@ -154,13 +178,32 @@ fn write_into_lmdb_database(
wtxn: &mut heed::RwTxn, wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase, database: heed::PolyDatabase,
mut reader: Reader<FileFuse>, mut reader: Reader<FileFuse>,
merge: MergeFn,
method: WriteMethod,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
debug!("Writing MTBL stores..."); debug!("Writing MTBL stores...");
let before = Instant::now(); let before = Instant::now();
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; match method {
while let Some((k, v)) = reader.next()? { WriteMethod::Append => {
out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
while let Some((k, v)) = reader.next()? {
out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?;
}
},
WriteMethod::GetMergePut => {
while let Some((k, v)) = reader.next()? {
match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? {
Some(old_val) => {
// TODO improve the function signature and avoid alocating here!
let vals = vec![old_val.to_vec(), v.to_vec()];
let val = merge(k, &vals).expect("merge failed");
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)?
},
None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?,
}
}
}
} }
debug!("MTBL stores merged in {:.02?}!", before.elapsed()); debug!("MTBL stores merged in {:.02?}!", before.elapsed());
@ -191,7 +234,7 @@ fn csv_bytes_readers<'a>(
pub fn run<'a>( pub fn run<'a>(
env: &heed::Env, env: &heed::Env,
index: &Index, index: &Index,
opt: IndexerOpt, opt: &IndexerOpt,
content: &'a [u8], content: &'a [u8],
gzipped: bool, gzipped: bool,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
@ -204,7 +247,7 @@ pub fn run<'a>(
fn run_intern<'a>( fn run_intern<'a>(
env: &heed::Env, env: &heed::Env,
index: &Index, index: &Index,
opt: IndexerOpt, opt: &IndexerOpt,
content: &'a [u8], content: &'a [u8],
gzipped: bool, gzipped: bool,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
@ -224,6 +267,10 @@ fn run_intern<'a>(
None None
}; };
let rtxn = env.read_txn()?;
let number_of_documents = index.number_of_documents(&rtxn)?;
drop(rtxn);
let readers = csv_bytes_readers(content, gzipped, num_threads) let readers = csv_bytes_readers(content, gzipped, num_threads)
.into_par_iter() .into_par_iter()
.enumerate() .enumerate()
@ -236,7 +283,8 @@ fn run_intern<'a>(
chunk_compression_level, chunk_compression_level,
chunk_fusing_shrink_size, chunk_fusing_shrink_size,
)?; )?;
store.index_csv(rdr, i, num_threads, log_every_n) let base_document_id = number_of_documents;
store.index_csv(rdr, base_document_id, i, num_threads, log_every_n)
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
@ -283,18 +331,24 @@ fn run_intern<'a>(
.into_par_iter() .into_par_iter()
.for_each(|(dbtype, readers, merge)| { .for_each(|(dbtype, readers, merge)| {
let result = merge_readers(readers, merge); let result = merge_readers(readers, merge);
sender.send((dbtype, result)).unwrap(); if let Err(e) = sender.send((dbtype, result)) {
error!("sender error: {}", e);
}
}); });
}); });
let mut wtxn = env.write_txn()?; let mut wtxn = env.write_txn()?;
let contains_documents = number_of_documents != 0;
let write_method = if contains_documents { WriteMethod::GetMergePut } else { WriteMethod::Append };
debug!("Writing the docid word positions into LMDB on disk..."); debug!("Writing the docid word positions into LMDB on disk...");
merge_into_lmdb_database( merge_into_lmdb_database(
&mut wtxn, &mut wtxn,
*index.docid_word_positions.as_polymorph(), *index.docid_word_positions.as_polymorph(),
docid_word_positions_readers, docid_word_positions_readers,
docid_word_positions_merge, docid_word_positions_merge,
write_method
)?; )?;
debug!("Writing the documents into LMDB on disk..."); debug!("Writing the documents into LMDB on disk...");
@ -303,6 +357,7 @@ fn run_intern<'a>(
*index.documents.as_polymorph(), *index.documents.as_polymorph(),
documents_readers, documents_readers,
documents_merge, documents_merge,
write_method
)?; )?;
for (db_type, result) in receiver { for (db_type, result) in receiver {
@ -310,17 +365,23 @@ fn run_intern<'a>(
match db_type { match db_type {
DatabaseType::Main => { DatabaseType::Main => {
debug!("Writing the main elements into LMDB on disk..."); debug!("Writing the main elements into LMDB on disk...");
write_into_lmdb_database(&mut wtxn, index.main, content)?; write_into_lmdb_database(&mut wtxn, index.main, content, main_merge, write_method)?;
}, },
DatabaseType::WordDocids => { DatabaseType::WordDocids => {
debug!("Writing the words docids into LMDB on disk..."); debug!("Writing the words docids into LMDB on disk...");
let db = *index.word_docids.as_polymorph(); let db = *index.word_docids.as_polymorph();
write_into_lmdb_database(&mut wtxn, db, content)?; write_into_lmdb_database(&mut wtxn, db, content, word_docids_merge, write_method)?;
}, },
DatabaseType::WordsPairsProximitiesDocids => { DatabaseType::WordsPairsProximitiesDocids => {
debug!("Writing the words pairs proximities docids into LMDB on disk..."); debug!("Writing the words pairs proximities docids into LMDB on disk...");
let db = *index.word_pair_proximity_docids.as_polymorph(); let db = *index.word_pair_proximity_docids.as_polymorph();
write_into_lmdb_database(&mut wtxn, db, content)?; write_into_lmdb_database(
&mut wtxn,
db,
content,
words_pairs_proximities_docids_merge,
write_method,
)?;
}, },
} }
} }

View File

@ -304,6 +304,7 @@ impl Store {
pub fn index_csv<'a>( pub fn index_csv<'a>(
mut self, mut self,
mut rdr: csv::Reader<Box<dyn Read + Send + 'a>>, mut rdr: csv::Reader<Box<dyn Read + Send + 'a>>,
base_document_id: usize,
thread_index: usize, thread_index: usize,
num_threads: usize, num_threads: usize,
log_every_n: usize, log_every_n: usize,
@ -316,7 +317,7 @@ impl Store {
self.write_headers(&headers)?; self.write_headers(&headers)?;
let mut before = Instant::now(); let mut before = Instant::now();
let mut document_id: usize = 0; let mut document_id: usize = base_document_id;
let mut document = csv::StringRecord::new(); let mut document = csv::StringRecord::new();
let mut words_positions = HashMap::new(); let mut words_positions = HashMap::new();

View File

@ -63,5 +63,5 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let file = File::open(file_path)?; let file = File::open(file_path)?;
let content = unsafe { memmap::Mmap::map(&file)? }; let content = unsafe { memmap::Mmap::map(&file)? };
indexing::run(&env, &index, opt.indexer, &content, gzipped) indexing::run(&env, &index, &opt.indexer, &content, gzipped)
} }

View File

@ -4,7 +4,6 @@ use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use askama_warp::Template; use askama_warp::Template;
@ -19,7 +18,7 @@ use tokio::sync::broadcast;
use warp::filters::ws::Message; use warp::filters::ws::Message;
use warp::{Filter, http::Response}; use warp::{Filter, http::Response};
use crate::indexing::IndexerOpt; use crate::indexing::{self, IndexerOpt};
use crate::tokenizer::{simple_tokenizer, TokenType}; use crate::tokenizer::{simple_tokenizer, TokenType};
use crate::{Index, UpdateStore, SearchResult}; use crate::{Index, UpdateStore, SearchResult};
@ -111,6 +110,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
.timestamp(stderrlog::Timestamp::Off) .timestamp(stderrlog::Timestamp::Off)
.init()?; .init()?;
create_dir_all(&opt.database)?;
let env = EnvOpenOptions::new() let env = EnvOpenOptions::new()
.map_size(opt.database_size) .map_size(opt.database_size)
.max_dbs(10) .max_dbs(10)
@ -128,48 +128,73 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let (update_status_sender, _) = broadcast::channel(100); let (update_status_sender, _) = broadcast::channel(100);
let update_status_sender_cloned = update_status_sender.clone(); let update_status_sender_cloned = update_status_sender.clone();
let env_cloned = env.clone();
let index_cloned = index.clone();
let indexer_opt_cloned = opt.indexer.clone();
let update_store = UpdateStore::open( let update_store = UpdateStore::open(
update_store_options, update_store_options,
update_store_path, update_store_path,
move |update_id, meta: String, _content| { move |update_id, meta: String, content| {
let processing = UpdateStatus::Processing { update_id, meta: meta.clone() }; let processing = UpdateStatus::Processing { update_id, meta: meta.clone() };
let _ = update_status_sender_cloned.send(processing); let _ = update_status_sender_cloned.send(processing);
std::thread::sleep(Duration::from_secs(3)); let _progress = UpdateStatus::Progressing { update_id, meta: meta.clone() };
// let _ = update_status_sender_cloned.send(progress);
let progress = UpdateStatus::Progressing { update_id, meta: meta.clone() }; let gzipped = false;
let _ = update_status_sender_cloned.send(progress); let result = indexing::run(
&env_cloned,
&index_cloned,
&indexer_opt_cloned,
content,
gzipped,
);
std::thread::sleep(Duration::from_secs(3)); let meta = match result {
Ok(()) => format!("valid update content"),
let progress = UpdateStatus::Progressing { update_id, meta: meta.clone() }; Err(e) => {
let _ = update_status_sender_cloned.send(progress); format!("error while processing update content: {}", e)
}
std::thread::sleep(Duration::from_secs(3)); };
let processed = UpdateStatus::Processed { update_id, meta: meta.clone() }; let processed = UpdateStatus::Processed { update_id, meta: meta.clone() };
let _ = update_status_sender_cloned.send(processed); let _ = update_status_sender_cloned.send(processed);
Ok(meta) Ok(meta)
})?; })?;
// Retrieve the database the file stem (w/o the extension), // The database name will not change.
// the disk file size and the number of documents in the database.
let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize; let lmdb_path = opt.database.join("data.mdb");
let rtxn = env.read_txn()?;
let docs_count = index.number_of_documents(&rtxn)? as usize;
drop(rtxn);
// We run and wait on the HTTP server // We run and wait on the HTTP server
// Expose an HTML page to debug the search in a browser // Expose an HTML page to debug the search in a browser
let db_name_cloned = db_name.clone(); let db_name_cloned = db_name.clone();
let lmdb_path_cloned = lmdb_path.clone();
let env_cloned = env.clone();
let index_cloned = index.clone();
let dash_html_route = warp::filters::method::get() let dash_html_route = warp::filters::method::get()
.and(warp::filters::path::end()) .and(warp::filters::path::end())
.map(move || IndexTemplate { db_name: db_name_cloned.clone(), db_size, docs_count }); .map(move || {
// We retrieve the database size.
let db_size = File::open(lmdb_path_cloned.clone())
.unwrap()
.metadata()
.unwrap()
.len() as usize;
// And the number of documents in the database.
let rtxn = env_cloned.clone().read_txn().unwrap();
let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize;
IndexTemplate { db_name: db_name_cloned.clone(), db_size, docs_count }
});
let update_store_cloned = update_store.clone(); let update_store_cloned = update_store.clone();
let lmdb_path_cloned = lmdb_path.clone();
let env_cloned = env.clone();
let index_cloned = index.clone();
let updates_list_or_html_route = warp::filters::method::get() let updates_list_or_html_route = warp::filters::method::get()
.and(warp::header("Accept")) .and(warp::header("Accept"))
.and(warp::path!("updates")) .and(warp::path!("updates"))
@ -190,6 +215,18 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
if header.contains("text/html") { if header.contains("text/html") {
updates.reverse(); updates.reverse();
// We retrieve the database size.
let db_size = File::open(lmdb_path_cloned.clone())
.unwrap()
.metadata()
.unwrap()
.len() as usize;
// And the number of documents in the database.
let rtxn = env_cloned.clone().read_txn().unwrap();
let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize;
let template = UpdatesTemplate { let template = UpdatesTemplate {
db_name: db_name.clone(), db_name: db_name.clone(),
db_size, db_size,