diff --git a/Cargo.lock b/Cargo.lock index 2e210231d..35ace21fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,11 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -[[package]] -name = "adler32" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" - [[package]] name = "anyhow" version = "1.0.31" @@ -159,9 +153,6 @@ name = "cc" version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" -dependencies = [ - "jobserver", -] [[package]] name = "cfg-if" @@ -195,21 +186,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79bb3adfaf5f75d24b01aee375f7555907840fa2800e5ec8fa3b9e2031830173" -[[package]] -name = "crc32c" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ba37ef26c12988c1cee882d522d65e1d5d2ad8c3864665b88ee92767ed84c5" - -[[package]] -name = "crc32fast" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" -dependencies = [ - "cfg-if", -] - [[package]] name = "criterion" version = "0.3.2" @@ -347,18 +323,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -[[package]] -name = "flate2" -version = "1.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cfff41391129e0a856d6d822600b8d71179d46879e310417eb9c762eb178b42" -dependencies = [ - "cfg-if", - "crc32fast", - "libc", - "miniz_oxide", -] - [[package]] name = "fnv" version = "1.0.7" @@ -523,12 +487,6 @@ dependencies = [ "wasi", ] -[[package]] -name = "glob" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" - [[package]] name = "h2" version = "0.2.5" @@ -752,15 +710,6 @@ dependencies = [ "libc", ] -[[package]] -name = "jobserver" -version = "0.1.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" -dependencies = [ - "libc", -] - [[package]] name = "js-sys" version = "0.3.40" @@ -861,8 +810,6 @@ dependencies = [ "levenshtein_automata", "memmap", "once_cell", - "oxidized-mtbl", - "rayon", "roaring", "serde", "slice-group-by", @@ -936,15 +883,6 @@ dependencies = [ "unicase 2.6.0", ] -[[package]] -name = "miniz_oxide" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5" -dependencies = [ - "adler32", -] - [[package]] name = "mio" version = "0.6.22" @@ -1075,18 +1013,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" -[[package]] -name = "oxidized-mtbl" -version = "0.1.0" -source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=9451be8#9451be8829562f7d1f8d34aa3ecb81c5106a0623" -dependencies = [ - "byteorder", - "crc32c", - "flate2", - "snap", - "zstd", -] - [[package]] name = "page_size" version = "0.4.2" @@ -1642,12 +1568,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" -[[package]] -name = "snap" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7fb9b0bb877b35a1cc1474a3b43d9c226a2625311760cdda2cbccbc0c7a8376" - [[package]] name = "socket2" version = "0.3.12" @@ -2148,34 +2068,3 @@ dependencies = [ "syn", "synstructure", ] - -[[package]] -name = "zstd" -version = "0.5.2+zstd.1.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "644352b10ce7f333d6e0af85bd4f5322dc449416dc1211c6308e95bca8923db4" -dependencies = [ - "zstd-safe", -] - -[[package]] -name = "zstd-safe" -version = "2.0.4+zstd.1.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7113c0c9aed2c55181f2d9f5b0a36e7d2c0183b11c058ab40b35987479efe4d7" -dependencies = [ - "libc", - "zstd-sys", -] - -[[package]] -name = "zstd-sys" -version = "1.4.16+zstd.1.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c442965efc45353be5a9b9969c9b0872fff6828c7e06d118dda2cb2d0bb11d5a" -dependencies = [ - "cc", - "glob", - "itertools", - "libc", -] diff --git a/Cargo.toml b/Cargo.toml index 79707af9e..220f2653a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,6 @@ jemallocator = "0.3.2" levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] } memmap = "0.7.0" once_cell = "1.4.0" -oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "9451be8" } -rayon = "1.3.0" roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "deserialize-from-slice" } slice-group-by = "0.2.6" smallstr = "0.2.0" diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 5013a0ce9..5b0d8f1b4 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,22 +1,21 @@ use std::collections::hash_map::Entry; -use std::collections::{HashMap, BTreeSet, BTreeMap}; +use std::collections::{HashMap, BTreeSet}; use std::convert::{TryFrom, TryInto}; -use std::fs::File; +use std::io; +use std::iter::FromIterator; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use anyhow::Context; use cow_utils::CowUtils; -use fst::{Streamer, IntoStreamer}; +use fst::Streamer; use heed::EnvOpenOptions; use heed::types::*; -use oxidized_mtbl::{Reader, ReaderOptions, Writer, Merger, MergerOptions}; -use rayon::prelude::*; use roaring::RoaringBitmap; use slice_group_by::StrGroupBy; use structopt::StructOpt; -use mega_mini_indexer::{FastMap4, SmallVec32, Index, DocumentId, AttributeId}; +use mega_mini_indexer::{BEU32, Index, DocumentId}; const MAX_POSITION: usize = 1000; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; @@ -40,218 +39,12 @@ struct Opt { #[structopt(long = "db", parse(from_os_str))] database: PathBuf, - /// Number of parallel jobs, defaults to # of CPUs. - #[structopt(short, long)] - jobs: Option, - - /// Files to index in parallel. - files_to_index: Vec, + /// CSV file to index. + csv_file: Option, } -struct Indexed { - fst: fst::Set>, - postings_attrs: FastMap4, RoaringBitmap>, - prefix_postings_attrs: FastMap4, RoaringBitmap>, - postings_ids: FastMap4, FastMap4>, - prefix_postings_ids: FastMap4, FastMap4>, - headers: Vec, - documents: Vec<(DocumentId, Vec)>, -} - -#[derive(Default)] -struct MtblKvStore(Option); - -impl MtblKvStore { - fn from_indexed(mut indexed: Indexed) -> anyhow::Result { - eprintln!("{:?}: Creating an MTBL store from an Indexed...", rayon::current_thread_index()); - - let outfile = tempfile::tempfile()?; - let mut out = Writer::new(outfile, None)?; - - out.add(b"\0headers", indexed.headers)?; - out.add(b"\0words-fst", indexed.fst.as_fst().as_bytes())?; - - // postings ids keys are all prefixed by a '1' - let mut key = vec![0]; - let mut buffer = Vec::new(); - - // We must write the postings attrs - key[0] = 1; - // We must write the postings ids in order for mtbl therefore - // we iterate over the fst to read the words in order - let mut stream = indexed.fst.stream(); - while let Some(word) = stream.next() { - if let Some(attrs) = indexed.postings_attrs.remove(word) { - key.truncate(1); - key.extend_from_slice(word); - // We serialize the attrs ids into a buffer - buffer.clear(); - attrs.serialize_into(&mut buffer)?; - // that we write under the generated key into MTBL - out.add(&key, &buffer).unwrap(); - } - } - - // We must write the prefix postings attrs - key[0] = 2; - // We must write the postings ids in order for mtbl therefore - // we iterate over the fst to read the words in order - let mut stream = indexed.fst.stream(); - while let Some(word) = stream.next() { - for i in 1..=word.len() { - let prefix = &word[..i]; - if let Some(attrs) = indexed.prefix_postings_attrs.remove(prefix) { - key.truncate(1); - key.extend_from_slice(prefix); - // We serialize the attrs ids into a buffer - buffer.clear(); - attrs.serialize_into(&mut buffer)?; - // that we write under the generated key into MTBL - out.add(&key, &buffer).unwrap(); - } - } - } - - // We must write the postings ids - key[0] = 3; - // We must write the postings ids in order for mtbl therefore - // we iterate over the fst to read the words in order - let mut stream = indexed.fst.stream(); - while let Some(word) = stream.next() { - key.truncate(1); - key.extend_from_slice(word); - if let Some(attrs) = indexed.postings_ids.remove(word) { - let attrs: BTreeMap<_, _> = attrs.into_iter().collect(); - // We iterate over all the attributes containing the documents ids - for (attr, ids) in attrs { - // we postfix the word by the attribute id - key.extend_from_slice(&attr.to_be_bytes()); - // We serialize the document ids into a buffer - buffer.clear(); - ids.serialize_into(&mut buffer)?; - // that we write under the generated key into MTBL - out.add(&key, &buffer).unwrap(); - // And cleanup the attribute id afterward (u32 = 4 * u8) - key.truncate(key.len() - 4); - } - } - } - - // We must write the prefix postings ids - key[0] = 4; - let mut stream = indexed.fst.stream(); - while let Some(word) = stream.next() { - for i in 1..=word.len() { - let prefix = &word[..i]; - key.truncate(1); - key.extend_from_slice(prefix); - if let Some(attrs) = indexed.prefix_postings_ids.remove(prefix) { - let attrs: BTreeMap<_, _> = attrs.into_iter().collect(); - // We iterate over all the attributes containing the documents ids - for (attr, ids) in attrs { - // we postfix the word by the attribute id - key.extend_from_slice(&attr.to_be_bytes()); - // We serialize the document ids into a buffer - buffer.clear(); - ids.serialize_into(&mut buffer)?; - // that we write under the generated key into MTBL - out.add(&key, &buffer).unwrap(); - // And cleanup the attribute id afterward (u32 = 4 * u8) - key.truncate(key.len() - 4); - } - } - } - } - - // postings ids keys are all prefixed by a '4' - key[0] = 5; - indexed.documents.sort_unstable(); - for (id, content) in indexed.documents { - key.truncate(1); - key.extend_from_slice(&id.to_be_bytes()); - out.add(&key, content).unwrap(); - } - - let out = out.into_inner()?; - - eprintln!("{:?}: MTBL store created!", rayon::current_thread_index()); - Ok(MtblKvStore(Some(out))) - } - - fn merge(key: &[u8], values: &[Vec]) -> Option> { - if key == b"\0words-fst" { - let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect(); - - // Union of the two FSTs - let mut op = fst::set::OpBuilder::new(); - fsts.iter().for_each(|fst| op.push(fst.into_stream())); - let op = op.r#union(); - - let mut build = fst::SetBuilder::memory(); - build.extend_stream(op.into_stream()).unwrap(); - Some(build.into_inner().unwrap()) - } - else if key == b"\0headers" { - assert!(values.windows(2).all(|vs| vs[0] == vs[1])); - Some(values[0].to_vec()) - } - // We either merge postings attrs, prefix postings or postings ids. - else if key[0] == 1 || key[0] == 2 || key[0] == 3 || key[0] == 4 { - let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap(); - - for value in &values[1..] { - let bitmap = RoaringBitmap::deserialize_from(value.as_slice()).unwrap(); - first.union_with(&bitmap); - } - - let mut vec = Vec::new(); - first.serialize_into(&mut vec).unwrap(); - Some(vec) - } - else if key[0] == 5 { - assert!(values.windows(2).all(|vs| vs[0] == vs[1])); - Some(values[0].to_vec()) - } - else { - panic!("wut? {:?}", key) - } - } - - fn from_many(stores: Vec, mut f: F) -> anyhow::Result<()> - where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> - { - eprintln!("{:?}: Merging {} MTBL stores...", rayon::current_thread_index(), stores.len()); - - let mmaps: Vec<_> = stores.iter().flat_map(|m| { - m.0.as_ref().map(|f| unsafe { memmap::Mmap::map(f).unwrap() }) - }).collect(); - - let sources = mmaps.iter().map(|mmap| { - Reader::new(&mmap, ReaderOptions::default()).unwrap() - }).collect(); - - let opt = MergerOptions { merge: MtblKvStore::merge }; - let mut merger = Merger::new(sources, opt); - - let mut iter = merger.iter(); - while let Some((k, v)) = iter.next() { - (f)(k, v)?; - } - - eprintln!("{:?}: MTBL stores merged!", rayon::current_thread_index()); - Ok(()) - } -} - -fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { - eprintln!("{:?}: Indexing into an Indexed...", rayon::current_thread_index()); - - let mut document = csv::StringRecord::new(); - let mut postings_attrs = FastMap4::default(); - let prefix_postings_attrs = FastMap4::default(); - let mut postings_ids = FastMap4::default(); - let prefix_postings_ids = FastMap4::default(); - let mut documents = Vec::new(); +fn index_csv(wtxn: &mut heed::RwTxn, mut rdr: csv::Reader, index: &Index) -> anyhow::Result<()> { + eprintln!("Indexing into LMDB..."); // Write the headers into a Vec of bytes. let headers = rdr.headers()?; @@ -259,6 +52,8 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { writer.write_byte_record(headers.as_byte_record())?; let headers = writer.into_inner()?; + let mut document = csv::StringRecord::new(); + while rdr.read_record(&mut document)? { let document_id = ID_GENERATOR.fetch_add(1, Ordering::SeqCst); let document_id = DocumentId::try_from(document_id).context("Generated id is too big")?; @@ -269,14 +64,26 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { let word = word.cow_to_lowercase(); let position = (attr * 1000 + pos) as u32; - // We save the positions where this word has been seen. - postings_attrs.entry(SmallVec32::from(word.as_bytes())) - .or_insert_with(RoaringBitmap::new).insert(position); + // ------ merge word positions -------- - // We save the documents ids under the position and word we have seen it. - postings_ids.entry(SmallVec32::from(word.as_bytes())) - .or_insert_with(FastMap4::default).entry(position) // positions - .or_insert_with(RoaringBitmap::new).insert(document_id); // document ids + let ids = match index.word_positions.get(wtxn, &word)? { + Some(mut ids) => { ids.insert(position); ids }, + None => RoaringBitmap::from_iter(Some(position)), + }; + + index.word_positions.put(wtxn, &word, &ids)?; + + // ------ merge word position documents ids -------- + + let mut key = word.as_bytes().to_vec(); + key.extend_from_slice(&position.to_be_bytes()); + + let ids = match index.word_position_docids.get(wtxn, &key)? { + Some(mut ids) => { ids.insert(document_id); ids }, + None => RoaringBitmap::from_iter(Some(document_id)), + }; + + index.word_position_docids.put(wtxn, &key, &ids)?; } } } @@ -285,66 +92,21 @@ fn index_csv(mut rdr: csv::Reader) -> anyhow::Result { let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); writer.write_byte_record(document.as_byte_record())?; let document = writer.into_inner()?; - documents.push((document_id, document)); + index.documents.put(wtxn, &BEU32::new(document_id), &document)?; } // We store the words from the postings. let mut new_words = BTreeSet::default(); - for (word, _new_ids) in &postings_ids { + let iter = index.word_positions.as_polymorph().iter::<_, Str, DecodeIgnore>(wtxn)?; + for result in iter { + let (word, ()) = result?; new_words.insert(word.clone()); } - let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallVec32::as_ref))?; + let new_words_fst = fst::Set::from_iter(new_words)?; - let indexed = Indexed { - fst: new_words_fst, - headers, - postings_attrs, - prefix_postings_attrs, - postings_ids, - prefix_postings_ids, - documents, - }; - eprintln!("{:?}: Indexed created!", rayon::current_thread_index()); - - MtblKvStore::from_indexed(indexed) -} - -// TODO merge with the previous values -fn writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyhow::Result<()> { - if key == b"\0words-fst" { - // Write the words fst - index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", val)?; - } - else if key == b"\0headers" { - // Write the headers - index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; - } - else if key.starts_with(&[1]) { - // Write the postings lists - index.word_positions.as_polymorph() - .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; - } - else if key.starts_with(&[2]) { - // Write the prefix postings lists - index.prefix_word_positions.as_polymorph() - .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; - } - else if key.starts_with(&[3]) { - // Write the postings lists - index.word_position_docids.as_polymorph() - .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; - } - else if key.starts_with(&[4]) { - // Write the prefix postings lists - index.prefix_word_position_docids.as_polymorph() - .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; - } - else if key.starts_with(&[5]) { - // Write the documents - index.documents.as_polymorph() - .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; - } + index.put_fst(wtxn, &new_words_fst)?; + index.put_headers(wtxn, &headers)?; Ok(()) } @@ -392,10 +154,6 @@ fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> any fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); - if let Some(jobs) = opt.jobs { - rayon::ThreadPoolBuilder::new().num_threads(jobs).build_global()?; - } - std::fs::create_dir_all(&opt.database)?; let env = EnvOpenOptions::new() .map_size(100 * 1024 * 1024 * 1024) // 100 GB @@ -405,23 +163,24 @@ fn main() -> anyhow::Result<()> { let index = Index::new(&env)?; - let stores: Vec<_> = opt.files_to_index - .into_par_iter() - .map(|path| { - let rdr = csv::Reader::from_path(path)?; - index_csv(rdr) - }) - .inspect(|_| { - eprintln!("Total number of documents seen so far is {}", ID_GENERATOR.load(Ordering::Relaxed)) - }) - .collect::>()?; - - eprintln!("We are writing into LMDB..."); let mut wtxn = env.write_txn()?; - MtblKvStore::from_many(stores, |k, v| writer(&mut wtxn, &index, k, v))?; + + match opt.csv_file { + Some(path) => { + let rdr = csv::Reader::from_path(path)?; + index_csv(&mut wtxn, rdr, &index)?; + }, + None => { + let rdr = csv::Reader::from_reader(io::stdin()); + index_csv(&mut wtxn, rdr, &index)?; + } + }; + compute_words_attributes_docids(&mut wtxn, &index)?; let count = index.documents.len(&wtxn)?; + wtxn.commit()?; + eprintln!("Wrote {} documents into LMDB", count); Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 87f07a1ff..e003a0677 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,10 +63,18 @@ impl Index { }) } + pub fn put_headers(&self, wtxn: &mut heed::RwTxn, headers: &[u8]) -> anyhow::Result<()> { + Ok(self.main.put::<_, Str, ByteSlice>(wtxn, "headers", headers)?) + } + pub fn headers<'t>(&self, rtxn: &'t heed::RoTxn) -> heed::Result> { self.main.get::<_, Str, ByteSlice>(rtxn, "headers") } + pub fn put_fst>(&self, wtxn: &mut heed::RwTxn, fst: &fst::Set) -> anyhow::Result<()> { + Ok(self.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", fst.as_fst().as_bytes())?) + } + pub fn fst<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result>> { match self.main.get::<_, Str, ByteSlice>(rtxn, "words-fst")? { Some(bytes) => Ok(Some(fst::Set::new(bytes)?)),