Fix the word_docids capacity limit detection

This commit is contained in:
Clément Renault 2020-09-27 11:46:52 +02:00
parent 25b2853b70
commit d8354f6f02
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 29 additions and 31 deletions

View File

@ -91,7 +91,7 @@ struct IndexerOpt {
/// Size of the linked hash map cache when indexing. /// Size of the linked hash map cache when indexing.
/// The bigger it is, the faster the indexing is but the more memory it takes. /// The bigger it is, the faster the indexing is but the more memory it takes.
#[structopt(long, default_value = "4096")] #[structopt(long, default_value = "1048576")]
linked_hash_map_size: usize, linked_hash_map_size: usize,
/// The name of the compression algorithm to use when compressing intermediate /// The name of the compression algorithm to use when compressing intermediate
@ -169,9 +169,10 @@ type MergeFn = fn(&[u8], &[Vec<u8>]) -> Result<Vec<u8>, ()>;
struct Store { struct Store {
word_docids: LinkedHashMap<SmallVec32<u8>, RoaringBitmap>, word_docids: LinkedHashMap<SmallVec32<u8>, RoaringBitmap>,
word_docids_limit: usize,
documents_ids: RoaringBitmap, documents_ids: RoaringBitmap,
sorter: Sorter<MergeFn>, sorter: Sorter<MergeFn>,
documents_sorter: Sorter<MergeFn>, documents_writer: Writer<File>,
chunk_compression_type: CompressionType, chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>, chunk_compression_level: Option<u32>,
} }
@ -183,7 +184,7 @@ impl Store {
max_memory: Option<usize>, max_memory: Option<usize>,
chunk_compression_type: CompressionType, chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>, chunk_compression_level: Option<u32>,
) -> Store ) -> anyhow::Result<Store>
{ {
let mut builder = Sorter::builder(merge as MergeFn); let mut builder = Sorter::builder(merge as MergeFn);
builder.chunk_compression_type(chunk_compression_type); builder.chunk_compression_type(chunk_compression_type);
@ -197,35 +198,36 @@ impl Store {
builder.max_memory(memory); builder.max_memory(memory);
} }
let mut documents_builder = Sorter::builder(docs_merge as MergeFn); let mut documents_builder = Writer::builder();
documents_builder.chunk_compression_type(chunk_compression_type); documents_builder.compression_type(chunk_compression_type);
if let Some(level) = chunk_compression_level { if let Some(level) = chunk_compression_level {
builder.chunk_compression_level(level); documents_builder.compression_level(level);
} }
let documents_writer = tempfile::tempfile().map(|f| documents_builder.build(f))?;
Store { Ok(Store {
// We overflow by one before poping the LRU element. word_docids: LinkedHashMap::with_capacity(linked_hash_map_size),
word_docids: LinkedHashMap::with_capacity(linked_hash_map_size + 1), word_docids_limit: linked_hash_map_size,
documents_ids: RoaringBitmap::new(), documents_ids: RoaringBitmap::new(),
sorter: builder.build(), sorter: builder.build(),
documents_sorter: documents_builder.build(), documents_writer,
chunk_compression_type, chunk_compression_type,
chunk_compression_level, chunk_compression_level,
} })
} }
// Save the documents ids under the position and word we have seen it. // Save the documents ids under the position and word we have seen it.
fn insert_word_docid(&mut self, word: &str, id: DocumentId) -> anyhow::Result<()> { fn insert_word_docid(&mut self, word: &str, id: DocumentId) -> anyhow::Result<()> {
let word_vec = SmallVec32::from(word.as_bytes());
// if get_refresh finds the element it is assured to be at the end of the linked hash map. // if get_refresh finds the element it is assured to be at the end of the linked hash map.
match self.word_docids.get_refresh(&word_vec) { match self.word_docids.get_refresh(word.as_bytes()) {
Some(old) => { old.insert(id); }, Some(old) => { old.insert(id); },
None => { None => {
let word_vec = SmallVec32::from(word.as_bytes());
// A newly inserted element is append at the end of the linked hash map. // A newly inserted element is append at the end of the linked hash map.
self.word_docids.insert(word_vec, RoaringBitmap::from_iter(Some(id))); self.word_docids.insert(word_vec, RoaringBitmap::from_iter(Some(id)));
// If the word docids just reached it's capacity we must make sure to remove // If the word docids just reached it's capacity we must make sure to remove
// one element, this way next time we insert we doesn't grow the capacity. // one element, this way next time we insert we doesn't grow the capacity.
if self.word_docids.len() == self.word_docids.capacity() { if self.word_docids.len() == self.word_docids_limit {
// Removing the front element is equivalent to removing the LRU element. // Removing the front element is equivalent to removing the LRU element.
let lru = self.word_docids.pop_front(); let lru = self.word_docids.pop_front();
Self::write_word_docids(&mut self.sorter, lru)?; Self::write_word_docids(&mut self.sorter, lru)?;
@ -261,7 +263,7 @@ impl Store {
.with_context(|| format!("could not encode CSV record"))?; .with_context(|| format!("could not encode CSV record"))?;
self.documents_ids.insert(document_id); self.documents_ids.insert(document_id);
self.documents_sorter.insert(document_id.to_be_bytes(), record)?; self.documents_writer.insert(document_id.to_be_bytes(), record)?;
Self::write_docid_word_positions(&mut self.sorter, document_id, words_positions)?; Self::write_docid_word_positions(&mut self.sorter, document_id, words_positions)?;
Ok(()) Ok(())
@ -436,10 +438,7 @@ impl Store {
let fst = builder.into_set(); let fst = builder.into_set();
wtr.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?; wtr.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?;
let docs_wtr_file = tempfile::tempfile()?; let docs_file = self.documents_writer.into_inner()?;
let mut docs_wtr = create_writer(compression_type, compression_level, docs_wtr_file);
self.documents_sorter.write_into(&mut docs_wtr)?;
let docs_file = docs_wtr.into_inner()?;
let docs_mmap = unsafe { Mmap::map(&docs_file)? }; let docs_mmap = unsafe { Mmap::map(&docs_file)? };
let docs_reader = Reader::new(docs_mmap)?; let docs_reader = Reader::new(docs_mmap)?;
@ -451,12 +450,6 @@ impl Store {
} }
} }
fn docs_merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> {
let key = key.try_into().unwrap();
let id = u32::from_be_bytes(key);
panic!("documents must not conflict ({} with {} values)!", id, values.len())
}
fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> { fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> {
match key { match key {
WORDS_FST_KEY => { WORDS_FST_KEY => {
@ -633,13 +626,14 @@ fn main() -> anyhow::Result<()> {
.into_par_iter() .into_par_iter()
.enumerate() .enumerate()
.map(|(i, rdr)| { .map(|(i, rdr)| {
Store::new( let store = Store::new(
linked_hash_map_size, linked_hash_map_size,
max_nb_chunks, max_nb_chunks,
Some(max_memory), Some(max_memory),
chunk_compression_type, chunk_compression_type,
chunk_compression_level, chunk_compression_level,
).index_csv(rdr, i, num_threads) )?;
store.index_csv(rdr, i, num_threads)
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;

View File

@ -217,10 +217,14 @@ impl<'a> Search<'a> {
eprintln!("found pairs {:?}", pairs); eprintln!("found pairs {:?}", pairs);
let mut pairs_union = RoaringBitmap::new(); let mut pairs_union = RoaringBitmap::new();
for (w1, w2) in pairs { 'pairs: for (w1, w2) in pairs {
let key = (w1, w2, 1); for prox in 1..=7 {
if let Some(docids) = index.word_pair_proximity_docids.get(rtxn, &key)? { let key = (w1, w2, prox);
pairs_union.union_with(&docids); eprintln!("{:?}", key);
if let Some(docids) = index.word_pair_proximity_docids.get(rtxn, &key)? {
pairs_union.union_with(&docids);
continue 'pairs;
}
} }
} }