Compute the word attribute postings lists on each threads

This commit is contained in:
Clément Renault 2020-08-06 11:08:24 +02:00
parent 8d734941af
commit d3b1096510
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 108 additions and 109 deletions

View File

@ -1,6 +1,4 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;
use std::fs::File;
use std::iter::FromIterator;
use std::path::PathBuf;
@ -9,7 +7,7 @@ use std::time::Instant;
use anyhow::Context;
use arc_cache::ArcCache;
use cow_utils::CowUtils;
use fst::{Streamer, IntoStreamer};
use fst::IntoStreamer;
use heed::EnvOpenOptions;
use heed::types::*;
use log::debug;
@ -20,7 +18,7 @@ use roaring::RoaringBitmap;
use slice_group_by::StrGroupBy;
use structopt::StructOpt;
use milli::{SmallVec32, Index, DocumentId, Position};
use milli::{SmallVec32, Index, DocumentId, Position, Attribute};
const LMDB_MAX_KEY_LENGTH: usize = 512;
const ONE_MILLION: usize = 1_000_000;
@ -29,7 +27,11 @@ const MAX_POSITION: usize = 1000;
const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION;
const HEADERS_KEY: &[u8] = b"\0headers";
const WORDS_FST_KEY: &[u8] = b"\x06words-fst";
const WORDS_FST_KEY: &[u8] = b"\x05words-fst";
const WORD_POSITIONS_BYTE: u8 = 1;
const WORD_POSITION_DOCIDS_BYTE: u8 = 2;
const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3;
const DOCUMENT_BYTE: u8 = 4;
#[cfg(target_os = "linux")]
#[global_allocator]
@ -60,6 +62,10 @@ struct Opt {
#[structopt(long)]
max_memory: Option<usize>,
/// Size of the ARC cache when indexing.
#[structopt(long)]
arc_cache_size: Option<usize>,
/// Verbose mode (-v, -vv, -vvv, etc.)
#[structopt(short, long, parse(from_occurrences))]
verbose: usize,
@ -77,11 +83,12 @@ type MergeFn = fn(&[u8], &[Vec<u8>]) -> Result<Vec<u8>, ()>;
struct Store {
word_positions: ArcCache<SmallVec32<u8>, RoaringBitmap>,
word_position_docids: ArcCache<(SmallVec32<u8>, Position), RoaringBitmap>,
word_attribute_docids: ArcCache<(SmallVec32<u8>, Attribute), RoaringBitmap>,
sorter: Sorter<MergeFn>,
}
impl Store {
fn new(max_nb_chunks: Option<usize>, max_memory: Option<usize>) -> Store {
fn new(arc_cache_size: Option<usize>, max_nb_chunks: Option<usize>, max_memory: Option<usize>) -> Store {
let mut builder = Sorter::builder(merge as MergeFn);
builder.chunk_compression_type(CompressionType::Snappy);
@ -94,9 +101,12 @@ impl Store {
builder.max_memory(memory);
}
let arc_cache_size = arc_cache_size.unwrap_or(65_535);
Store {
word_positions: ArcCache::new(65_535),
word_position_docids: ArcCache::new(65_535),
word_positions: ArcCache::new(arc_cache_size),
word_position_docids: ArcCache::new(arc_cache_size),
word_attribute_docids: ArcCache::new(arc_cache_size),
sorter: builder.build(),
}
}
@ -111,10 +121,19 @@ impl Store {
// Save the documents ids under the position and word we have seen it.
pub fn insert_word_position_docid(&mut self, word: &str, position: Position, id: DocumentId) -> anyhow::Result<()> {
let word_vec = SmallVec32::from(word.as_bytes());
let ids = RoaringBitmap::from_iter(Some(id));
let (_, lrus) = self.word_position_docids.insert((word_vec, position), ids, |old, new| old.union_with(&new));
Self::write_word_position_docids(&mut self.sorter, lrus)?;
self.insert_word_attribute_docid(word, position / MAX_POSITION as u32, id)
}
// Save the documents ids under the attribute and word we have seen it.
fn insert_word_attribute_docid(&mut self, word: &str, attribute: Attribute, id: DocumentId) -> anyhow::Result<()> {
let word = SmallVec32::from(word.as_bytes());
let ids = RoaringBitmap::from_iter(Some(id));
let (_, lrus) = self.word_position_docids.insert((word, position), ids, |old, new| old.union_with(&new));
Self::write_word_position_docids(&mut self.sorter, lrus)
let (_, lrus) = self.word_attribute_docids.insert((word, attribute), ids, |old, new| old.union_with(&new));
Self::write_word_attribute_docids(&mut self.sorter, lrus)
}
pub fn write_headers(&mut self, headers: &[u8]) -> anyhow::Result<()> {
@ -125,8 +144,7 @@ impl Store {
let id = id.to_be_bytes();
let mut key = Vec::with_capacity(1 + id.len());
// postings ids keys are all prefixed by a '5'
key.push(5);
key.push(DOCUMENT_BYTE);
key.extend_from_slice(&id);
Ok(self.sorter.insert(&key, content)?)
@ -135,8 +153,8 @@ impl Store {
fn write_word_positions<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()>
where I: IntoIterator<Item=(SmallVec32<u8>, RoaringBitmap)>
{
// postings ids keys are all prefixed by a '1'
let mut key = vec![1];
// postings ids keys are all prefixed
let mut key = vec![WORD_POSITIONS_BYTE];
let mut buffer = Vec::new();
for (word, positions) in iter {
@ -157,8 +175,8 @@ impl Store {
fn write_word_position_docids<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()>
where I: IntoIterator<Item=((SmallVec32<u8>, Position), RoaringBitmap)>
{
// postings positions ids keys are all prefixed by a '3'
let mut key = vec![3];
// postings positions ids keys are all prefixed
let mut key = vec![WORD_POSITION_DOCIDS_BYTE];
let mut buffer = Vec::new();
for ((word, pos), ids) in iter {
@ -178,9 +196,34 @@ impl Store {
Ok(())
}
fn write_word_attribute_docids<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()>
where I: IntoIterator<Item=((SmallVec32<u8>, Attribute), RoaringBitmap)>
{
// postings attributes keys are all prefixed
let mut key = vec![WORD_ATTRIBUTE_DOCIDS_BYTE];
let mut buffer = Vec::new();
for ((word, attr), ids) in iter {
key.truncate(1);
key.extend_from_slice(&word);
// we postfix the word by the positions it appears in
key.extend_from_slice(&attr.to_be_bytes());
// We serialize the document ids into a buffer
buffer.clear();
ids.serialize_into(&mut buffer)?;
// that we write under the generated key into MTBL
if lmdb_key_valid_size(&key) {
sorter.insert(&key, &buffer)?;
}
}
Ok(())
}
pub fn finish(mut self) -> anyhow::Result<Reader<Mmap>> {
Self::write_word_positions(&mut self.sorter, self.word_positions)?;
Self::write_word_position_docids(&mut self.sorter, self.word_position_docids)?;
Self::write_word_attribute_docids(&mut self.sorter, self.word_attribute_docids)?;
let mut wtr = tempfile::tempfile().map(Writer::new)?;
let mut builder = fst::SetBuilder::memory();
@ -208,45 +251,47 @@ impl Store {
}
fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> {
if key == WORDS_FST_KEY {
let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect();
match key {
WORDS_FST_KEY => {
let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect();
// Union of the two FSTs
let mut op = fst::set::OpBuilder::new();
fsts.iter().for_each(|fst| op.push(fst.into_stream()));
let op = op.r#union();
// Union of the two FSTs
let mut op = fst::set::OpBuilder::new();
fsts.iter().for_each(|fst| op.push(fst.into_stream()));
let op = op.r#union();
let mut build = fst::SetBuilder::memory();
build.extend_stream(op.into_stream()).unwrap();
Ok(build.into_inner().unwrap())
}
else if key == HEADERS_KEY {
assert!(values.windows(2).all(|vs| vs[0] == vs[1]));
Ok(values[0].to_vec())
}
// We either merge postings attrs, prefix postings or postings ids.
else if key[0] == 1 || key[0] == 2 || key[0] == 3 || key[0] == 4 {
let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap();
let mut build = fst::SetBuilder::memory();
build.extend_stream(op.into_stream()).unwrap();
Ok(build.into_inner().unwrap())
},
HEADERS_KEY => {
assert!(values.windows(2).all(|vs| vs[0] == vs[1]));
Ok(values[0].to_vec())
},
key => match key[0] {
WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => {
let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap();
for value in &values[1..] {
let bitmap = RoaringBitmap::deserialize_from(value.as_slice()).unwrap();
first.union_with(&bitmap);
for value in &values[1..] {
let bitmap = RoaringBitmap::deserialize_from(value.as_slice()).unwrap();
first.union_with(&bitmap);
}
let mut vec = Vec::new();
first.serialize_into(&mut vec).unwrap();
Ok(vec)
},
DOCUMENT_BYTE => {
assert!(values.windows(2).all(|vs| vs[0] == vs[1]));
Ok(values[0].to_vec())
},
otherwise => panic!("wut {:?}", otherwise),
}
let mut vec = Vec::new();
first.serialize_into(&mut vec).unwrap();
Ok(vec)
}
else if key[0] == 5 {
assert!(values.windows(2).all(|vs| vs[0] == vs[1]));
Ok(values[0].to_vec())
}
else {
panic!("wut? {:?}", key)
}
}
// TODO merge with the previous values
// TODO store the documents in a compressed MTBL
fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyhow::Result<()> {
if key == WORDS_FST_KEY {
// Write the words fst
@ -256,27 +301,22 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) ->
// Write the headers
index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?;
}
else if key.starts_with(&[1]) {
else if key.starts_with(&[WORD_POSITIONS_BYTE]) {
// Write the postings lists
index.word_positions.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
else if key.starts_with(&[2]) {
// Write the prefix postings lists
index.prefix_word_positions.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
else if key.starts_with(&[3]) {
else if key.starts_with(&[WORD_POSITION_DOCIDS_BYTE]) {
// Write the postings lists
index.word_position_docids.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
else if key.starts_with(&[4]) {
// Write the prefix postings lists
index.prefix_word_position_docids.as_polymorph()
else if key.starts_with(&[WORD_ATTRIBUTE_DOCIDS_BYTE]) {
// Write the attribute postings lists
index.word_attribute_docids.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
}
else if key.starts_with(&[5]) {
else if key.starts_with(&[DOCUMENT_BYTE]) {
// Write the documents
index.documents.as_polymorph()
.put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?;
@ -309,13 +349,14 @@ fn index_csv(
mut rdr: csv::Reader<File>,
thread_index: usize,
num_threads: usize,
arc_cache_size: Option<usize>,
max_nb_chunks: Option<usize>,
max_memory: Option<usize>,
) -> anyhow::Result<Reader<Mmap>>
{
debug!("{:?}: Indexing into an Indexed...", thread_index);
let mut store = Store::new(max_nb_chunks, max_memory);
let mut store = Store::new(arc_cache_size, max_nb_chunks, max_memory);
// Write the headers into a Vec of bytes and then into the store.
let headers = rdr.headers()?;
@ -324,6 +365,7 @@ fn index_csv(
let headers = writer.into_inner()?;
store.write_headers(&headers)?;
let mut before = Instant::now();
let mut document_id: usize = 0;
let mut document = csv::StringRecord::new();
while rdr.read_record(&mut document)? {
@ -334,7 +376,9 @@ fn index_csv(
let document_id = DocumentId::try_from(document_id).context("generated id is too big")?;
if document_id % (ONE_MILLION as u32) == 0 {
debug!("We have seen {}m documents so far.", document_id / ONE_MILLION as u32);
debug!("We have seen {}m documents so far ({:.02?}).",
document_id / ONE_MILLION as u32, before.elapsed());
before = Instant::now();
}
for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) {
@ -358,51 +402,6 @@ fn index_csv(
Ok(reader)
}
// TODO do that in the threads.
fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> anyhow::Result<()> {
let before = Instant::now();
debug!("Computing the attributes documents ids...");
let fst = match index.fst(&wtxn)? {
Some(fst) => fst.map_data(|s| s.to_vec())?,
None => return Ok(()),
};
let mut word_attributes = HashMap::new();
let mut stream = fst.stream();
while let Some(word) = stream.next() {
word_attributes.clear();
// Loop on the word attributes and unions all the documents ids by attribute.
for result in index.word_position_docids.prefix_iter(wtxn, word)? {
let (key, docids) = result?;
let (_key_word, key_pos) = key.split_at(key.len() - 4);
let key_pos = key_pos.try_into().map(u32::from_be_bytes)?;
// If the key corresponds to the word (minus the attribute)
if key.len() == word.len() + 4 {
let attribute = key_pos / MAX_POSITION as u32;
match word_attributes.entry(attribute) {
Entry::Vacant(entry) => { entry.insert(docids); },
Entry::Occupied(mut entry) => entry.get_mut().union_with(&docids),
}
}
}
// Write this word attributes unions into LMDB.
let mut key = word.to_vec();
for (attribute, docids) in word_attributes.drain() {
key.truncate(word.len());
key.extend_from_slice(&attribute.to_be_bytes());
index.word_attribute_docids.put(wtxn, &key, &docids)?;
}
}
debug!("Computing the attributes documents ids took {:.02?}.", before.elapsed());
Ok(())
}
fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
@ -426,6 +425,7 @@ fn main() -> anyhow::Result<()> {
let index = Index::new(&env)?;
let num_threads = rayon::current_num_threads();
let arc_cache_size = opt.arc_cache_size;
let max_nb_chunks = opt.max_nb_chunks;
let max_memory = opt.max_memory;
@ -438,14 +438,13 @@ fn main() -> anyhow::Result<()> {
let stores = csv_readers
.into_par_iter()
.enumerate()
.map(|(i, rdr)| index_csv(rdr, i, num_threads, max_nb_chunks, max_memory))
.map(|(i, rdr)| index_csv(rdr, i, num_threads, arc_cache_size, max_nb_chunks, max_memory))
.collect::<Result<_, _>>()?;
debug!("We are writing into LMDB...");
let mut wtxn = env.write_txn()?;
merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?;
compute_words_attributes_docids(&mut wtxn, &index)?;
let count = index.documents.len(&wtxn)?;
wtxn.commit()?;

View File

@ -34,7 +34,7 @@ pub type SmallVec32<T> = smallvec::SmallVec<[T; 32]>;
pub type SmallVec16<T> = smallvec::SmallVec<[T; 16]>;
pub type BEU32 = heed::zerocopy::U32<heed::byteorder::BE>;
pub type DocumentId = u32;
pub type AttributeId = u32;
pub type Attribute = u32;
pub type Position = u32;
#[derive(Clone)]