mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-04 18:45:46 +01:00
Wip
This commit is contained in:
parent
c0fd3dffb8
commit
01101d55ac
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -2704,6 +2704,7 @@ dependencies = [
|
|||||||
"logging_timer",
|
"logging_timer",
|
||||||
"maplit",
|
"maplit",
|
||||||
"md5",
|
"md5",
|
||||||
|
"meili-snap",
|
||||||
"memmap2",
|
"memmap2",
|
||||||
"mimalloc",
|
"mimalloc",
|
||||||
"obkv",
|
"obkv",
|
||||||
|
@ -79,6 +79,7 @@ big_s = "1.0.2"
|
|||||||
insta = "1.29.0"
|
insta = "1.29.0"
|
||||||
maplit = "1.0.2"
|
maplit = "1.0.2"
|
||||||
md5 = "0.7.0"
|
md5 = "0.7.0"
|
||||||
|
meili-snap = { path = "../meili-snap" }
|
||||||
rand = { version = "0.8.5", features = ["small_rng"] }
|
rand = { version = "0.8.5", features = ["small_rng"] }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
@ -13,6 +13,7 @@ This module tests the `sort` ranking rule:
|
|||||||
|
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
use maplit::hashset;
|
use maplit::hashset;
|
||||||
|
use meili_snap::insta;
|
||||||
|
|
||||||
use crate::index::tests::TempIndex;
|
use crate::index::tests::TempIndex;
|
||||||
use crate::search::new::tests::collect_field_values;
|
use crate::search::new::tests::collect_field_values;
|
||||||
|
@ -4,11 +4,11 @@ use std::fs::File;
|
|||||||
use std::{io, mem, str};
|
use std::{io, mem, str};
|
||||||
|
|
||||||
use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
|
use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
|
||||||
use obkv::KvReader;
|
use obkv::{KvReader, KvWriterU16};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use super::helpers::{concat_u32s_array, create_sorter, sorter_into_reader, GrenadParameters};
|
use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters};
|
||||||
use crate::error::{InternalError, SerializationError};
|
use crate::error::{InternalError, SerializationError};
|
||||||
use crate::update::index_documents::MergeFn;
|
use crate::update::index_documents::MergeFn;
|
||||||
use crate::{
|
use crate::{
|
||||||
@ -42,7 +42,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
|||||||
let mut script_language_docids = HashMap::new();
|
let mut script_language_docids = HashMap::new();
|
||||||
let mut docid_word_positions_sorter = create_sorter(
|
let mut docid_word_positions_sorter = create_sorter(
|
||||||
grenad::SortAlgorithm::Stable,
|
grenad::SortAlgorithm::Stable,
|
||||||
concat_u32s_array,
|
keep_latest_obkv,
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
@ -155,6 +155,7 @@ fn extract_tokens_from_document(
|
|||||||
let tokens = process_tokens(tokenizer.tokenize(field))
|
let tokens = process_tokens(tokenizer.tokenize(field))
|
||||||
.take_while(|(p, _)| (*p as u32) < max_positions_per_attributes);
|
.take_while(|(p, _)| (*p as u32) < max_positions_per_attributes);
|
||||||
|
|
||||||
|
let mut writer = KvWriterU16::memory();
|
||||||
for (index, token) in tokens {
|
for (index, token) in tokens {
|
||||||
// if a language has been detected for the token, we update the counter.
|
// if a language has been detected for the token, we update the counter.
|
||||||
if let Some(language) = token.language {
|
if let Some(language) = token.language {
|
||||||
@ -168,17 +169,17 @@ fn extract_tokens_from_document(
|
|||||||
}
|
}
|
||||||
let token = token.lemma().trim();
|
let token = token.lemma().trim();
|
||||||
if !token.is_empty() && token.len() <= MAX_WORD_LENGTH {
|
if !token.is_empty() && token.len() <= MAX_WORD_LENGTH {
|
||||||
buffers.key_buffer.truncate(mem::size_of::<u32>());
|
|
||||||
buffers.key_buffer.extend_from_slice(token.as_bytes());
|
|
||||||
|
|
||||||
let position: u16 = index
|
let position: u16 = index
|
||||||
.try_into()
|
.try_into()
|
||||||
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
|
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
|
||||||
let position = absolute_from_relative_position(field_id, position);
|
writer.insert(position, token.as_bytes())?;
|
||||||
docid_word_positions_sorter
|
|
||||||
.insert(&buffers.key_buffer, position.to_ne_bytes())?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let positions = writer.into_inner()?;
|
||||||
|
buffers.key_buffer.truncate(mem::size_of::<u32>());
|
||||||
|
buffers.key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
||||||
|
docid_word_positions_sorter.insert(&buffers.key_buffer, positions)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,17 @@
|
|||||||
use std::collections::HashMap;
|
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use grenad::Sorter;
|
use obkv::KvReaderU16;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
|
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
|
||||||
try_split_array_at, GrenadParameters, MergeFn,
|
GrenadParameters,
|
||||||
};
|
};
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
use crate::{relative_from_absolute_position, DocumentId, FieldId, Result};
|
use crate::Result;
|
||||||
|
|
||||||
|
const MAX_COUNTED_WORDS: usize = 30;
|
||||||
|
|
||||||
/// Extracts the field id word count and the documents ids where
|
/// Extracts the field id word count and the documents ids where
|
||||||
/// this field id with this amount of words appear.
|
/// this field id with this amount of words appear.
|
||||||
@ -35,63 +36,21 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
|
|||||||
max_memory,
|
max_memory,
|
||||||
);
|
);
|
||||||
|
|
||||||
// This map is assumed to not consume a lot of memory.
|
let mut key_buffer = Vec::new();
|
||||||
let mut document_fid_wordcount = HashMap::new();
|
|
||||||
let mut current_document_id = None;
|
|
||||||
|
|
||||||
let mut cursor = docid_word_positions.into_cursor()?;
|
let mut cursor = docid_word_positions.into_cursor()?;
|
||||||
while let Some((key, value)) = cursor.move_on_next()? {
|
while let Some((key, value)) = cursor.move_on_next()? {
|
||||||
let (document_id_bytes, _word_bytes) = try_split_array_at(key)
|
let (document_id_bytes, fid_bytes) = try_split_array_at(key)
|
||||||
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
||||||
let document_id = u32::from_be_bytes(document_id_bytes);
|
let document_id = u32::from_be_bytes(document_id_bytes);
|
||||||
|
|
||||||
let curr_document_id = *current_document_id.get_or_insert(document_id);
|
let word_count = KvReaderU16::new(&value).iter().take(MAX_COUNTED_WORDS + 1).count();
|
||||||
if curr_document_id != document_id {
|
if word_count <= MAX_COUNTED_WORDS {
|
||||||
drain_document_fid_wordcount_into_sorter(
|
|
||||||
&mut fid_word_count_docids_sorter,
|
|
||||||
&mut document_fid_wordcount,
|
|
||||||
curr_document_id,
|
|
||||||
)?;
|
|
||||||
current_document_id = Some(document_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
for position in read_u32_ne_bytes(value) {
|
|
||||||
let (field_id, _) = relative_from_absolute_position(position);
|
|
||||||
|
|
||||||
let value = document_fid_wordcount.entry(field_id as FieldId).or_insert(0);
|
|
||||||
*value += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(document_id) = current_document_id {
|
|
||||||
// We must make sure that don't lose the current document field id
|
|
||||||
// word count map if we break because we reached the end of the chunk.
|
|
||||||
drain_document_fid_wordcount_into_sorter(
|
|
||||||
&mut fid_word_count_docids_sorter,
|
|
||||||
&mut document_fid_wordcount,
|
|
||||||
document_id,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
sorter_into_reader(fid_word_count_docids_sorter, indexer)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn drain_document_fid_wordcount_into_sorter(
|
|
||||||
fid_word_count_docids_sorter: &mut Sorter<MergeFn>,
|
|
||||||
document_fid_wordcount: &mut HashMap<FieldId, u32>,
|
|
||||||
document_id: DocumentId,
|
|
||||||
) -> Result<()> {
|
|
||||||
let mut key_buffer = Vec::new();
|
|
||||||
|
|
||||||
for (fid, count) in document_fid_wordcount.drain() {
|
|
||||||
if count <= 30 {
|
|
||||||
key_buffer.clear();
|
key_buffer.clear();
|
||||||
key_buffer.extend_from_slice(&fid.to_be_bytes());
|
key_buffer.extend_from_slice(fid_bytes);
|
||||||
key_buffer.push(count as u8);
|
key_buffer.push(word_count as u8);
|
||||||
|
|
||||||
fid_word_count_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
|
fid_word_count_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
sorter_into_reader(fid_word_count_docids_sorter, indexer)
|
||||||
}
|
}
|
||||||
|
@ -1,18 +1,19 @@
|
|||||||
use std::collections::HashSet;
|
use std::collections::{BTreeSet, HashSet};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
|
|
||||||
|
use obkv::KvReaderU16;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, merge_roaring_bitmaps, serialize_roaring_bitmap, sorter_into_reader,
|
create_sorter, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, serialize_roaring_bitmap,
|
||||||
try_split_array_at, GrenadParameters,
|
sorter_into_reader, try_split_array_at, GrenadParameters,
|
||||||
};
|
};
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
use crate::update::index_documents::helpers::read_u32_ne_bytes;
|
use crate::update::MergeFn;
|
||||||
use crate::{relative_from_absolute_position, FieldId, Result};
|
use crate::{DocumentId, FieldId, Result};
|
||||||
|
|
||||||
/// Extracts the word and the documents ids where this word appear.
|
/// Extracts the word and the documents ids where this word appear.
|
||||||
///
|
///
|
||||||
@ -26,7 +27,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
docid_word_positions: grenad::Reader<R>,
|
docid_word_positions: grenad::Reader<R>,
|
||||||
indexer: GrenadParameters,
|
indexer: GrenadParameters,
|
||||||
exact_attributes: &HashSet<FieldId>,
|
exact_attributes: &HashSet<FieldId>,
|
||||||
) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> {
|
) -> Result<(grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>)> {
|
||||||
puffin::profile_function!();
|
puffin::profile_function!();
|
||||||
|
|
||||||
let max_memory = indexer.max_memory_by_thread();
|
let max_memory = indexer.max_memory_by_thread();
|
||||||
@ -37,7 +38,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory.map(|x| x / 2),
|
max_memory.map(|x| x / 3),
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut exact_word_docids_sorter = create_sorter(
|
let mut exact_word_docids_sorter = create_sorter(
|
||||||
@ -46,45 +47,116 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory.map(|x| x / 2),
|
max_memory.map(|x| x / 3),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut word_fid_docids_sorter = create_sorter(
|
||||||
|
grenad::SortAlgorithm::Unstable,
|
||||||
|
merge_roaring_bitmaps,
|
||||||
|
indexer.chunk_compression_type,
|
||||||
|
indexer.chunk_compression_level,
|
||||||
|
indexer.max_nb_chunks,
|
||||||
|
max_memory.map(|x| x / 3),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut current_document_id = None;
|
||||||
|
let mut fid = 0;
|
||||||
|
let mut key_buffer = Vec::new();
|
||||||
let mut value_buffer = Vec::new();
|
let mut value_buffer = Vec::new();
|
||||||
|
let mut words = BTreeSet::new();
|
||||||
|
let mut exact_words = BTreeSet::new();
|
||||||
let mut cursor = docid_word_positions.into_cursor()?;
|
let mut cursor = docid_word_positions.into_cursor()?;
|
||||||
while let Some((key, positions)) = cursor.move_on_next()? {
|
while let Some((key, value)) = cursor.move_on_next()? {
|
||||||
let (document_id_bytes, word_bytes) = try_split_array_at(key)
|
let (document_id_bytes, fid_bytes) = try_split_array_at(key)
|
||||||
|
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
||||||
|
let (fid_bytes, _) = try_split_array_at(key)
|
||||||
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
||||||
let document_id = u32::from_be_bytes(document_id_bytes);
|
let document_id = u32::from_be_bytes(document_id_bytes);
|
||||||
|
fid = u16::from_be_bytes(fid_bytes);
|
||||||
|
|
||||||
let bitmap = RoaringBitmap::from_iter(Some(document_id));
|
// drain the btreemaps when we change document.
|
||||||
serialize_roaring_bitmap(&bitmap, &mut value_buffer)?;
|
if current_document_id.map_or(false, |id| id != document_id) {
|
||||||
|
words_into_sorters(
|
||||||
|
document_id,
|
||||||
|
fid,
|
||||||
|
&mut key_buffer,
|
||||||
|
&mut value_buffer,
|
||||||
|
&mut exact_words,
|
||||||
|
&mut words,
|
||||||
|
&mut exact_word_docids_sorter,
|
||||||
|
&mut word_docids_sorter,
|
||||||
|
&mut word_fid_docids_sorter,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
// If there are no exact attributes, we do not need to iterate over positions.
|
current_document_id = Some(document_id);
|
||||||
if exact_attributes.is_empty() {
|
|
||||||
word_docids_sorter.insert(word_bytes, &value_buffer)?;
|
// every words contained in an attribute set to exact must be pushed in the exact_words list.
|
||||||
|
if exact_attributes.contains(&fid) {
|
||||||
|
for (_pos, word) in KvReaderU16::new(&value).iter() {
|
||||||
|
exact_words.insert(word.to_vec());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
let mut added_to_exact = false;
|
for (_pos, word) in KvReaderU16::new(&value).iter() {
|
||||||
let mut added_to_word_docids = false;
|
words.insert(word.to_vec());
|
||||||
for position in read_u32_ne_bytes(positions) {
|
|
||||||
// as soon as we know that this word had been to both readers, we don't need to
|
|
||||||
// iterate over the positions.
|
|
||||||
if added_to_exact && added_to_word_docids {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
let (fid, _) = relative_from_absolute_position(position);
|
|
||||||
if exact_attributes.contains(&fid) && !added_to_exact {
|
|
||||||
exact_word_docids_sorter.insert(word_bytes, &value_buffer)?;
|
|
||||||
added_to_exact = true;
|
|
||||||
} else if !added_to_word_docids {
|
|
||||||
word_docids_sorter.insert(word_bytes, &value_buffer)?;
|
|
||||||
added_to_word_docids = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We must make sure that don't lose the current document field id
|
||||||
|
if let Some(document_id) = current_document_id {
|
||||||
|
words_into_sorters(
|
||||||
|
document_id,
|
||||||
|
fid,
|
||||||
|
&mut key_buffer,
|
||||||
|
&mut value_buffer,
|
||||||
|
&mut exact_words,
|
||||||
|
&mut words,
|
||||||
|
&mut exact_word_docids_sorter,
|
||||||
|
&mut word_docids_sorter,
|
||||||
|
&mut word_fid_docids_sorter,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
sorter_into_reader(word_docids_sorter, indexer)?,
|
sorter_into_reader(word_docids_sorter, indexer)?,
|
||||||
sorter_into_reader(exact_word_docids_sorter, indexer)?,
|
sorter_into_reader(exact_word_docids_sorter, indexer)?,
|
||||||
|
sorter_into_reader(word_fid_docids_sorter, indexer)?,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn words_into_sorters(
|
||||||
|
document_id: DocumentId,
|
||||||
|
fid: FieldId,
|
||||||
|
key_buffer: &mut Vec<u8>,
|
||||||
|
value_buffer: &mut Vec<u8>,
|
||||||
|
exact_words: &mut BTreeSet<Vec<u8>>,
|
||||||
|
words: &mut BTreeSet<Vec<u8>>,
|
||||||
|
exact_word_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
||||||
|
word_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
||||||
|
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
||||||
|
) -> Result<()> {
|
||||||
|
puffin::profile_function!();
|
||||||
|
let bitmap = RoaringBitmap::from_iter(Some(document_id));
|
||||||
|
serialize_roaring_bitmap(&bitmap, value_buffer)?;
|
||||||
|
for word_bytes in exact_words.iter() {
|
||||||
|
exact_word_docids_sorter.insert(word_bytes, &mut *value_buffer)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
for word_bytes in words.iter() {
|
||||||
|
word_docids_sorter.insert(word_bytes, &value_buffer)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
for word_bytes in (&*words | &*exact_words).iter() {
|
||||||
|
key_buffer.clear();
|
||||||
|
key_buffer.extend_from_slice(&word_bytes);
|
||||||
|
key_buffer.push(0);
|
||||||
|
key_buffer.extend_from_slice(&fid.to_be_bytes());
|
||||||
|
word_fid_docids_sorter.insert(word_bytes, &value_buffer)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
exact_words.clear();
|
||||||
|
words.clear();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
@ -17,6 +17,8 @@ pub fn extract_word_fid_docids<R: io::Read + io::Seek>(
|
|||||||
) -> Result<grenad::Reader<File>> {
|
) -> Result<grenad::Reader<File>> {
|
||||||
puffin::profile_function!();
|
puffin::profile_function!();
|
||||||
|
|
||||||
|
todo!("remove me");
|
||||||
|
|
||||||
let max_memory = indexer.max_memory_by_thread();
|
let max_memory = indexer.max_memory_by_thread();
|
||||||
|
|
||||||
let mut word_fid_docids_sorter = create_sorter(
|
let mut word_fid_docids_sorter = create_sorter(
|
||||||
|
@ -1,11 +1,13 @@
|
|||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
use std::collections::{BinaryHeap, HashMap};
|
use std::collections::HashMap;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::{cmp, io, mem, str, vec};
|
use std::{cmp, io};
|
||||||
|
|
||||||
|
use obkv::KvReaderU16;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
|
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
|
||||||
try_split_array_at, GrenadParameters, MergeFn,
|
GrenadParameters, MergeFn,
|
||||||
};
|
};
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
@ -34,44 +36,59 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
|||||||
max_memory.map(|m| m / 2),
|
max_memory.map(|m| m / 2),
|
||||||
);
|
);
|
||||||
|
|
||||||
// This map is assumed to not consume a lot of memory.
|
let mut word_positions: Vec<(String, u16)> = Vec::with_capacity(MAX_DISTANCE as usize);
|
||||||
let mut document_word_positions_heap = BinaryHeap::new();
|
let mut word_pair_proximity = HashMap::new();
|
||||||
let mut current_document_id = None;
|
let mut current_document_id = None;
|
||||||
|
|
||||||
let mut cursor = docid_word_positions.into_cursor()?;
|
let mut cursor = docid_word_positions.into_cursor()?;
|
||||||
while let Some((key, value)) = cursor.move_on_next()? {
|
while let Some((key, value)) = cursor.move_on_next()? {
|
||||||
let (document_id_bytes, word_bytes) = try_split_array_at(key)
|
let (document_id_bytes, _fid_bytes) = try_split_array_at(key)
|
||||||
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
||||||
let document_id = u32::from_be_bytes(document_id_bytes);
|
let document_id = u32::from_be_bytes(document_id_bytes);
|
||||||
let word = str::from_utf8(word_bytes)?;
|
|
||||||
|
|
||||||
let curr_document_id = *current_document_id.get_or_insert(document_id);
|
for (position, word) in KvReaderU16::new(&value).iter() {
|
||||||
if curr_document_id != document_id {
|
// if we change document, we fill the sorter
|
||||||
let document_word_positions_heap = mem::take(&mut document_word_positions_heap);
|
if current_document_id.map_or(false, |id| id != document_id) {
|
||||||
document_word_positions_into_sorter(
|
while !word_positions.is_empty() {
|
||||||
curr_document_id,
|
word_positions_into_word_pair_proximity(
|
||||||
document_word_positions_heap,
|
&mut word_positions,
|
||||||
&mut word_pair_proximity_docids_sorter,
|
&mut word_pair_proximity,
|
||||||
)?;
|
)?;
|
||||||
current_document_id = Some(document_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let word = word.to_string();
|
document_word_positions_into_sorter(
|
||||||
let mut positions: Vec<_> = read_u32_ne_bytes(value).collect();
|
document_id,
|
||||||
positions.sort_unstable();
|
&word_pair_proximity,
|
||||||
let mut iter = positions.into_iter();
|
&mut word_pair_proximity_docids_sorter,
|
||||||
if let Some(position) = iter.next() {
|
)?;
|
||||||
document_word_positions_heap.push(PeekedWordPosition { word, position, iter });
|
word_pair_proximity.clear();
|
||||||
|
word_positions.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// drain the proximity window until the head word is considered close to the word we are inserting.
|
||||||
|
while word_positions.get(0).map_or(false, |(_w, p)| {
|
||||||
|
positions_proximity(*p as u32, position as u32) > MAX_DISTANCE
|
||||||
|
}) {
|
||||||
|
word_positions_into_word_pair_proximity(
|
||||||
|
&mut word_positions,
|
||||||
|
&mut word_pair_proximity,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// insert the new word.
|
||||||
|
let word = std::str::from_utf8(word)?;
|
||||||
|
word_positions.push((word.to_string(), position));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(document_id) = current_document_id {
|
if let Some(document_id) = current_document_id {
|
||||||
// We must make sure that don't lose the current document field id
|
while !word_positions.is_empty() {
|
||||||
// word count map if we break because we reached the end of the chunk.
|
word_positions_into_word_pair_proximity(&mut word_positions, &mut word_pair_proximity)?;
|
||||||
let document_word_positions_heap = mem::take(&mut document_word_positions_heap);
|
}
|
||||||
|
|
||||||
document_word_positions_into_sorter(
|
document_word_positions_into_sorter(
|
||||||
document_id,
|
document_id,
|
||||||
document_word_positions_heap,
|
&word_pair_proximity,
|
||||||
&mut word_pair_proximity_docids_sorter,
|
&mut word_pair_proximity_docids_sorter,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
@ -85,64 +102,13 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
|||||||
/// close to each other.
|
/// close to each other.
|
||||||
fn document_word_positions_into_sorter(
|
fn document_word_positions_into_sorter(
|
||||||
document_id: DocumentId,
|
document_id: DocumentId,
|
||||||
mut word_positions_heap: BinaryHeap<PeekedWordPosition<vec::IntoIter<u32>>>,
|
word_pair_proximity: &HashMap<(String, String), u8>,
|
||||||
word_pair_proximity_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
word_pair_proximity_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut word_pair_proximity = HashMap::new();
|
|
||||||
let mut ordered_peeked_word_positions = Vec::new();
|
|
||||||
while !word_positions_heap.is_empty() {
|
|
||||||
while let Some(peeked_word_position) = word_positions_heap.pop() {
|
|
||||||
ordered_peeked_word_positions.push(peeked_word_position);
|
|
||||||
if ordered_peeked_word_positions.len() == 7 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some((head, tail)) = ordered_peeked_word_positions.split_first() {
|
|
||||||
for PeekedWordPosition { word, position, .. } in tail {
|
|
||||||
let prox = positions_proximity(head.position, *position);
|
|
||||||
if prox > 0 && prox < MAX_DISTANCE {
|
|
||||||
word_pair_proximity
|
|
||||||
.entry((head.word.clone(), word.clone()))
|
|
||||||
.and_modify(|p| {
|
|
||||||
*p = cmp::min(*p, prox);
|
|
||||||
})
|
|
||||||
.or_insert(prox);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push the tail in the heap.
|
|
||||||
let tail_iter = ordered_peeked_word_positions.drain(1..);
|
|
||||||
word_positions_heap.extend(tail_iter);
|
|
||||||
|
|
||||||
// Advance the head and push it in the heap.
|
|
||||||
if let Some(mut head) = ordered_peeked_word_positions.pop() {
|
|
||||||
if let Some(next_position) = head.iter.next() {
|
|
||||||
let prox = positions_proximity(head.position, next_position);
|
|
||||||
|
|
||||||
if prox > 0 && prox < MAX_DISTANCE {
|
|
||||||
word_pair_proximity
|
|
||||||
.entry((head.word.clone(), head.word.clone()))
|
|
||||||
.and_modify(|p| {
|
|
||||||
*p = cmp::min(*p, prox);
|
|
||||||
})
|
|
||||||
.or_insert(prox);
|
|
||||||
}
|
|
||||||
|
|
||||||
word_positions_heap.push(PeekedWordPosition {
|
|
||||||
word: head.word,
|
|
||||||
position: next_position,
|
|
||||||
iter: head.iter,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut key_buffer = Vec::new();
|
let mut key_buffer = Vec::new();
|
||||||
for ((w1, w2), prox) in word_pair_proximity {
|
for ((w1, w2), prox) in word_pair_proximity {
|
||||||
key_buffer.clear();
|
key_buffer.clear();
|
||||||
key_buffer.push(prox as u8);
|
key_buffer.push(*prox as u8);
|
||||||
key_buffer.extend_from_slice(w1.as_bytes());
|
key_buffer.extend_from_slice(w1.as_bytes());
|
||||||
key_buffer.push(0);
|
key_buffer.push(0);
|
||||||
key_buffer.extend_from_slice(w2.as_bytes());
|
key_buffer.extend_from_slice(w2.as_bytes());
|
||||||
@ -153,6 +119,23 @@ fn document_word_positions_into_sorter(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn word_positions_into_word_pair_proximity(
|
||||||
|
word_positions: &mut Vec<(String, u16)>,
|
||||||
|
word_pair_proximity: &mut HashMap<(String, String), u8>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let (head_word, head_position) = word_positions.remove(0);
|
||||||
|
for (word, position) in word_positions.iter() {
|
||||||
|
let prox = positions_proximity(head_position as u32, *position as u32) as u8;
|
||||||
|
word_pair_proximity
|
||||||
|
.entry((head_word.clone(), word.clone()))
|
||||||
|
.and_modify(|p| {
|
||||||
|
*p = cmp::min(*p, prox);
|
||||||
|
})
|
||||||
|
.or_insert(prox);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
struct PeekedWordPosition<I> {
|
struct PeekedWordPosition<I> {
|
||||||
word: String,
|
word: String,
|
||||||
position: u32,
|
position: u32,
|
||||||
|
@ -1,13 +1,15 @@
|
|||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
|
use obkv::KvReaderU16;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
|
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
|
||||||
try_split_array_at, GrenadParameters,
|
GrenadParameters,
|
||||||
};
|
};
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
use crate::{bucketed_position, relative_from_absolute_position, DocumentId, Result};
|
use crate::{bucketed_position, DocumentId, Result};
|
||||||
|
|
||||||
/// Extracts the word positions and the documents ids where this word appear.
|
/// Extracts the word positions and the documents ids where this word appear.
|
||||||
///
|
///
|
||||||
@ -34,15 +36,14 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
|||||||
let mut key_buffer = Vec::new();
|
let mut key_buffer = Vec::new();
|
||||||
let mut cursor = docid_word_positions.into_cursor()?;
|
let mut cursor = docid_word_positions.into_cursor()?;
|
||||||
while let Some((key, value)) = cursor.move_on_next()? {
|
while let Some((key, value)) = cursor.move_on_next()? {
|
||||||
let (document_id_bytes, word_bytes) = try_split_array_at(key)
|
let (document_id_bytes, fid_bytes) = try_split_array_at(key)
|
||||||
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
|
||||||
let document_id = DocumentId::from_be_bytes(document_id_bytes);
|
let document_id = DocumentId::from_be_bytes(document_id_bytes);
|
||||||
|
|
||||||
for position in read_u32_ne_bytes(value) {
|
for (position, word_bytes) in KvReaderU16::new(&value).iter() {
|
||||||
key_buffer.clear();
|
key_buffer.clear();
|
||||||
key_buffer.extend_from_slice(word_bytes);
|
key_buffer.extend_from_slice(word_bytes);
|
||||||
key_buffer.push(0);
|
key_buffer.push(0);
|
||||||
let (_, position) = relative_from_absolute_position(position);
|
|
||||||
let position = bucketed_position(position);
|
let position = bucketed_position(position);
|
||||||
key_buffer.extend_from_slice(&position.to_be_bytes());
|
key_buffer.extend_from_slice(&position.to_be_bytes());
|
||||||
word_position_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
|
word_position_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
|
||||||
|
@ -172,15 +172,22 @@ pub(crate) fn data_from_obkv_documents(
|
|||||||
"field-id-wordcount-docids",
|
"field-id-wordcount-docids",
|
||||||
);
|
);
|
||||||
|
|
||||||
spawn_extraction_task::<_, _, Vec<(grenad::Reader<File>, grenad::Reader<File>)>>(
|
spawn_extraction_task::<
|
||||||
|
_,
|
||||||
|
_,
|
||||||
|
Vec<(grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>)>,
|
||||||
|
>(
|
||||||
docid_word_positions_chunks.clone(),
|
docid_word_positions_chunks.clone(),
|
||||||
indexer,
|
indexer,
|
||||||
lmdb_writer_sx.clone(),
|
lmdb_writer_sx.clone(),
|
||||||
move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes),
|
move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes),
|
||||||
merge_roaring_bitmaps,
|
merge_roaring_bitmaps,
|
||||||
|(word_docids_reader, exact_word_docids_reader)| TypedChunk::WordDocids {
|
|(word_docids_reader, exact_word_docids_reader, word_fid_docids_reader)| {
|
||||||
|
TypedChunk::WordDocids {
|
||||||
word_docids_reader,
|
word_docids_reader,
|
||||||
exact_word_docids_reader,
|
exact_word_docids_reader,
|
||||||
|
word_fid_docids_reader,
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"word-docids",
|
"word-docids",
|
||||||
);
|
);
|
||||||
@ -194,15 +201,15 @@ pub(crate) fn data_from_obkv_documents(
|
|||||||
TypedChunk::WordPositionDocids,
|
TypedChunk::WordPositionDocids,
|
||||||
"word-position-docids",
|
"word-position-docids",
|
||||||
);
|
);
|
||||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
// spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
||||||
docid_word_positions_chunks,
|
// docid_word_positions_chunks,
|
||||||
indexer,
|
// indexer,
|
||||||
lmdb_writer_sx.clone(),
|
// lmdb_writer_sx.clone(),
|
||||||
extract_word_fid_docids,
|
// extract_word_fid_docids,
|
||||||
merge_cbo_roaring_bitmaps,
|
// merge_cbo_roaring_bitmaps,
|
||||||
TypedChunk::WordFidDocids,
|
// TypedChunk::WordFidDocids,
|
||||||
"word-fid-docids",
|
// "word-fid-docids",
|
||||||
);
|
// );
|
||||||
|
|
||||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
||||||
docid_fid_facet_strings_chunks,
|
docid_fid_facet_strings_chunks,
|
||||||
|
@ -113,6 +113,22 @@ impl MergeableReader for Vec<(grenad::Reader<File>, grenad::Reader<File>)> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl MergeableReader for Vec<(grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>)> {
|
||||||
|
type Output = (grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>);
|
||||||
|
|
||||||
|
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
|
||||||
|
let mut m1 = MergerBuilder::new(merge_fn);
|
||||||
|
let mut m2 = MergerBuilder::new(merge_fn);
|
||||||
|
let mut m3 = MergerBuilder::new(merge_fn);
|
||||||
|
for (r1, r2, r3) in self.into_iter() {
|
||||||
|
m1.push(r1)?;
|
||||||
|
m2.push(r2)?;
|
||||||
|
m3.push(r3)?;
|
||||||
|
}
|
||||||
|
Ok((m1.finish(params)?, m2.finish(params)?, m3.finish(params)?))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct MergerBuilder<R>(grenad::MergerBuilder<R, MergeFn>);
|
struct MergerBuilder<R>(grenad::MergerBuilder<R, MergeFn>);
|
||||||
|
|
||||||
impl<R: io::Read + io::Seek> MergerBuilder<R> {
|
impl<R: io::Read + io::Seek> MergerBuilder<R> {
|
||||||
|
@ -406,13 +406,23 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
let typed_chunk = match result? {
|
let typed_chunk = match result? {
|
||||||
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => {
|
TypedChunk::WordDocids {
|
||||||
|
word_docids_reader,
|
||||||
|
exact_word_docids_reader,
|
||||||
|
word_fid_docids_reader,
|
||||||
|
} => {
|
||||||
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? };
|
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? };
|
||||||
word_docids = Some(cloneable_chunk);
|
word_docids = Some(cloneable_chunk);
|
||||||
let cloneable_chunk =
|
let cloneable_chunk =
|
||||||
unsafe { as_cloneable_grenad(&exact_word_docids_reader)? };
|
unsafe { as_cloneable_grenad(&exact_word_docids_reader)? };
|
||||||
exact_word_docids = Some(cloneable_chunk);
|
exact_word_docids = Some(cloneable_chunk);
|
||||||
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader }
|
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_fid_docids_reader)? };
|
||||||
|
word_fid_docids = Some(cloneable_chunk);
|
||||||
|
TypedChunk::WordDocids {
|
||||||
|
word_docids_reader,
|
||||||
|
exact_word_docids_reader,
|
||||||
|
word_fid_docids_reader,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
TypedChunk::WordPairProximityDocids(chunk) => {
|
TypedChunk::WordPairProximityDocids(chunk) => {
|
||||||
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
|
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
|
||||||
|
@ -32,6 +32,7 @@ pub(crate) enum TypedChunk {
|
|||||||
WordDocids {
|
WordDocids {
|
||||||
word_docids_reader: grenad::Reader<File>,
|
word_docids_reader: grenad::Reader<File>,
|
||||||
exact_word_docids_reader: grenad::Reader<File>,
|
exact_word_docids_reader: grenad::Reader<File>,
|
||||||
|
word_fid_docids_reader: grenad::Reader<File>,
|
||||||
},
|
},
|
||||||
WordPositionDocids(grenad::Reader<File>),
|
WordPositionDocids(grenad::Reader<File>),
|
||||||
WordFidDocids(grenad::Reader<File>),
|
WordFidDocids(grenad::Reader<File>),
|
||||||
@ -64,10 +65,15 @@ impl TypedChunk {
|
|||||||
TypedChunk::NewDocumentsIds(grenad) => {
|
TypedChunk::NewDocumentsIds(grenad) => {
|
||||||
format!("NewDocumentsIds {{ number_of_entries: {} }}", grenad.len())
|
format!("NewDocumentsIds {{ number_of_entries: {} }}", grenad.len())
|
||||||
}
|
}
|
||||||
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => format!(
|
TypedChunk::WordDocids {
|
||||||
"WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {} }}",
|
word_docids_reader,
|
||||||
|
exact_word_docids_reader,
|
||||||
|
word_fid_docids_reader,
|
||||||
|
} => format!(
|
||||||
|
"WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {}, word_fid_docids_reader: {} }}",
|
||||||
word_docids_reader.len(),
|
word_docids_reader.len(),
|
||||||
exact_word_docids_reader.len()
|
exact_word_docids_reader.len(),
|
||||||
|
word_fid_docids_reader.len()
|
||||||
),
|
),
|
||||||
TypedChunk::WordPositionDocids(grenad) => {
|
TypedChunk::WordPositionDocids(grenad) => {
|
||||||
format!("WordPositionDocids {{ number_of_entries: {} }}", grenad.len())
|
format!("WordPositionDocids {{ number_of_entries: {} }}", grenad.len())
|
||||||
@ -138,7 +144,11 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
TypedChunk::NewDocumentsIds(documents_ids) => {
|
TypedChunk::NewDocumentsIds(documents_ids) => {
|
||||||
return Ok((documents_ids, is_merged_database))
|
return Ok((documents_ids, is_merged_database))
|
||||||
}
|
}
|
||||||
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => {
|
TypedChunk::WordDocids {
|
||||||
|
word_docids_reader,
|
||||||
|
exact_word_docids_reader,
|
||||||
|
word_fid_docids_reader,
|
||||||
|
} => {
|
||||||
let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_reader) }?;
|
let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_reader) }?;
|
||||||
append_entries_into_database(
|
append_entries_into_database(
|
||||||
word_docids_iter.clone(),
|
word_docids_iter.clone(),
|
||||||
@ -159,6 +169,16 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
merge_roaring_bitmaps,
|
merge_roaring_bitmaps,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?;
|
||||||
|
append_entries_into_database(
|
||||||
|
word_fid_docids_iter,
|
||||||
|
&index.word_fid_docids,
|
||||||
|
wtxn,
|
||||||
|
index_is_empty,
|
||||||
|
|value, _buffer| Ok(value),
|
||||||
|
merge_cbo_roaring_bitmaps,
|
||||||
|
)?;
|
||||||
|
|
||||||
// create fst from word docids
|
// create fst from word docids
|
||||||
let fst = merge_word_docids_reader_into_fst(word_docids_iter, exact_word_docids_iter)?;
|
let fst = merge_word_docids_reader_into_fst(word_docids_iter, exact_word_docids_iter)?;
|
||||||
let db_fst = index.words_fst(wtxn)?;
|
let db_fst = index.words_fst(wtxn)?;
|
||||||
|
Loading…
Reference in New Issue
Block a user