Split wpp in several sorters

This commit is contained in:
ManyTheFish 2023-09-28 10:45:25 +02:00
parent 27161bcd05
commit 20394fda04

View File

@ -5,8 +5,8 @@ use std::{cmp, io};
use obkv::KvReaderU16; use obkv::KvReaderU16;
use super::helpers::{ use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, create_sorter, create_writer, merge_cbo_roaring_bitmaps, sorter_into_reader,
GrenadParameters, MergeFn, try_split_array_at, writer_into_reader, GrenadParameters, MergeFn,
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
@ -26,14 +26,19 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut word_pair_proximity_docids_sorter = create_sorter( let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
.into_iter()
.map(|_| {
create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_cbo_roaring_bitmaps, merge_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
max_memory.map(|m| m / 2), max_memory.map(|m| m / MAX_DISTANCE as usize),
); )
})
.collect();
let mut word_positions: VecDeque<(String, u16)> = let mut word_positions: VecDeque<(String, u16)> =
VecDeque::with_capacity(MAX_DISTANCE as usize); VecDeque::with_capacity(MAX_DISTANCE as usize);
@ -48,6 +53,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
// if we change document, we fill the sorter // if we change document, we fill the sorter
if current_document_id.map_or(false, |id| id != document_id) { if current_document_id.map_or(false, |id| id != document_id) {
puffin::profile_scope!("Document into sorter");
while !word_positions.is_empty() { while !word_positions.is_empty() {
word_positions_into_word_pair_proximity( word_positions_into_word_pair_proximity(
&mut word_positions, &mut word_positions,
@ -58,7 +64,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
document_word_positions_into_sorter( document_word_positions_into_sorter(
current_document_id.unwrap(), current_document_id.unwrap(),
&word_pair_proximity, &word_pair_proximity,
&mut word_pair_proximity_docids_sorter, &mut word_pair_proximity_docids_sorters,
)?; )?;
word_pair_proximity.clear(); word_pair_proximity.clear();
word_positions.clear(); word_positions.clear();
@ -84,6 +90,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
} }
if let Some(document_id) = current_document_id { if let Some(document_id) = current_document_id {
puffin::profile_scope!("Final document into sorter");
while !word_positions.is_empty() { while !word_positions.is_empty() {
word_positions_into_word_pair_proximity(&mut word_positions, &mut word_pair_proximity)?; word_positions_into_word_pair_proximity(&mut word_positions, &mut word_pair_proximity)?;
} }
@ -91,11 +98,23 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
document_word_positions_into_sorter( document_word_positions_into_sorter(
document_id, document_id,
&word_pair_proximity, &word_pair_proximity,
&mut word_pair_proximity_docids_sorter, &mut word_pair_proximity_docids_sorters,
)?; )?;
} }
{
puffin::profile_scope!("sorter_into_reader");
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
sorter_into_reader(word_pair_proximity_docids_sorter, indexer) for sorter in word_pair_proximity_docids_sorters {
sorter.write_into_stream_writer(&mut writer)?;
}
writer_into_reader(writer)
}
} }
/// Fills the list of all pairs of words with the shortest proximity between 1 and 7 inclusive. /// Fills the list of all pairs of words with the shortest proximity between 1 and 7 inclusive.
@ -105,9 +124,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
fn document_word_positions_into_sorter( fn document_word_positions_into_sorter(
document_id: DocumentId, document_id: DocumentId,
word_pair_proximity: &HashMap<(String, String), u8>, word_pair_proximity: &HashMap<(String, String), u8>,
word_pair_proximity_docids_sorter: &mut grenad::Sorter<MergeFn>, word_pair_proximity_docids_sorters: &mut Vec<grenad::Sorter<MergeFn>>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
let mut key_buffer = Vec::new(); let mut key_buffer = Vec::new();
for ((w1, w2), prox) in word_pair_proximity { for ((w1, w2), prox) in word_pair_proximity {
key_buffer.clear(); key_buffer.clear();
@ -116,7 +134,8 @@ fn document_word_positions_into_sorter(
key_buffer.push(0); key_buffer.push(0);
key_buffer.extend_from_slice(w2.as_bytes()); key_buffer.extend_from_slice(w2.as_bytes());
word_pair_proximity_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?; word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert(&key_buffer, document_id.to_ne_bytes())?;
} }
Ok(()) Ok(())
@ -126,10 +145,10 @@ fn word_positions_into_word_pair_proximity(
word_positions: &mut VecDeque<(String, u16)>, word_positions: &mut VecDeque<(String, u16)>,
word_pair_proximity: &mut HashMap<(String, String), u8>, word_pair_proximity: &mut HashMap<(String, String), u8>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!();
let (head_word, head_position) = word_positions.pop_front().unwrap(); let (head_word, head_position) = word_positions.pop_front().unwrap();
for (word, position) in word_positions.iter() { for (word, position) in word_positions.iter() {
let prox = index_proximity(head_position as u32, *position as u32) as u8; let prox = index_proximity(head_position as u32, *position as u32) as u8;
if prox > 0 && prox < MAX_DISTANCE as u8 {
word_pair_proximity word_pair_proximity
.entry((head_word.clone(), word.clone())) .entry((head_word.clone(), word.clone()))
.and_modify(|p| { .and_modify(|p| {
@ -137,5 +156,6 @@ fn word_positions_into_word_pair_proximity(
}) })
.or_insert(prox); .or_insert(prox);
} }
}
Ok(()) Ok(())
} }