mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 23:04:26 +01:00
Refactor indexation of the "facet-id-exists-docids" database
The idea is to directly create a sorted and merged list of bitmaps in the form of a BTreeMap<FieldId, RoaringBitmap> instead of creating a grenad::Reader where the keys are field_id and the values are docids. Then we send that BTreeMap to the thing that handles TypedChunks, which inserts its content into the database.
This commit is contained in:
parent
1eb1e73bb3
commit
aed8c69bcb
@ -1,16 +1,16 @@
|
|||||||
use std::collections::HashSet;
|
use std::collections::{BTreeMap, HashSet};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
|
|
||||||
use heed::zerocopy::AsBytes;
|
use heed::zerocopy::AsBytes;
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
|
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
|
||||||
use crate::error::InternalError;
|
use crate::error::InternalError;
|
||||||
use crate::facet::value_encoding::f64_into_bytes;
|
use crate::facet::value_encoding::f64_into_bytes;
|
||||||
use crate::update::index_documents::merge_cbo_roaring_bitmaps;
|
|
||||||
use crate::{DocumentId, FieldId, Result, BEU32};
|
use crate::{DocumentId, FieldId, Result, BEU32};
|
||||||
|
|
||||||
/// Extracts the facet values of each faceted field of each document.
|
/// Extracts the facet values of each faceted field of each document.
|
||||||
@ -22,7 +22,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
obkv_documents: grenad::Reader<R>,
|
obkv_documents: grenad::Reader<R>,
|
||||||
indexer: GrenadParameters,
|
indexer: GrenadParameters,
|
||||||
faceted_fields: &HashSet<FieldId>,
|
faceted_fields: &HashSet<FieldId>,
|
||||||
) -> Result<(grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>)> {
|
) -> Result<(grenad::Reader<File>, grenad::Reader<File>, BTreeMap<FieldId, RoaringBitmap>)> {
|
||||||
let max_memory = indexer.max_memory_by_thread();
|
let max_memory = indexer.max_memory_by_thread();
|
||||||
|
|
||||||
let mut fid_docid_facet_numbers_sorter = create_sorter(
|
let mut fid_docid_facet_numbers_sorter = create_sorter(
|
||||||
@ -30,7 +30,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory.map(|m| m / 3),
|
max_memory.map(|m| m / 2),
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut fid_docid_facet_strings_sorter = create_sorter(
|
let mut fid_docid_facet_strings_sorter = create_sorter(
|
||||||
@ -38,16 +38,10 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
indexer.max_nb_chunks,
|
indexer.max_nb_chunks,
|
||||||
max_memory.map(|m| m / 3),
|
max_memory.map(|m| m / 2),
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut fid_docid_facet_exists_sorter = create_sorter(
|
let mut facet_exists_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
|
||||||
merge_cbo_roaring_bitmaps,
|
|
||||||
indexer.chunk_compression_type,
|
|
||||||
indexer.chunk_compression_level,
|
|
||||||
indexer.max_nb_chunks,
|
|
||||||
max_memory.map(|m| m / 3),
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut key_buffer = Vec::new();
|
let mut key_buffer = Vec::new();
|
||||||
let mut cursor = obkv_documents.into_cursor()?;
|
let mut cursor = obkv_documents.into_cursor()?;
|
||||||
@ -65,7 +59,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
// Here, we know already that the document must be added to the “field id exists” database
|
// Here, we know already that the document must be added to the “field id exists” database
|
||||||
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
|
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
|
||||||
let document = BEU32::from(document).get();
|
let document = BEU32::from(document).get();
|
||||||
fid_docid_facet_exists_sorter.insert(&key_buffer, document.to_ne_bytes())?;
|
|
||||||
|
facet_exists_docids.entry(field_id).or_default().insert(document);
|
||||||
|
|
||||||
// For the other extraction tasks, prefix the key with the field_id and the document_id
|
// For the other extraction tasks, prefix the key with the field_id and the document_id
|
||||||
key_buffer.extend_from_slice(&docid_bytes);
|
key_buffer.extend_from_slice(&docid_bytes);
|
||||||
@ -99,7 +94,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
Ok((
|
Ok((
|
||||||
sorter_into_reader(fid_docid_facet_numbers_sorter, indexer.clone())?,
|
sorter_into_reader(fid_docid_facet_numbers_sorter, indexer.clone())?,
|
||||||
sorter_into_reader(fid_docid_facet_strings_sorter, indexer.clone())?,
|
sorter_into_reader(fid_docid_facet_strings_sorter, indexer.clone())?,
|
||||||
sorter_into_reader(fid_docid_facet_exists_sorter, indexer)?,
|
facet_exists_docids,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,12 +8,13 @@ mod extract_word_docids;
|
|||||||
mod extract_word_pair_proximity_docids;
|
mod extract_word_pair_proximity_docids;
|
||||||
mod extract_word_position_docids;
|
mod extract_word_position_docids;
|
||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::{BTreeMap, HashSet};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
|
|
||||||
use crossbeam_channel::Sender;
|
use crossbeam_channel::Sender;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use self::extract_docid_word_positions::extract_docid_word_positions;
|
use self::extract_docid_word_positions::extract_docid_word_positions;
|
||||||
use self::extract_facet_number_docids::extract_facet_number_docids;
|
use self::extract_facet_number_docids::extract_facet_number_docids;
|
||||||
@ -72,12 +73,24 @@ pub(crate) fn data_from_obkv_documents(
|
|||||||
|
|
||||||
let (
|
let (
|
||||||
docid_word_positions_chunks,
|
docid_word_positions_chunks,
|
||||||
(
|
(docid_fid_facet_numbers_chunks, (docid_fid_facet_strings_chunks, facet_exists_docids)),
|
||||||
docid_fid_facet_numbers_chunks,
|
|
||||||
(docid_fid_facet_strings_chunks, docid_fid_facet_exists_chunks),
|
|
||||||
),
|
|
||||||
) = result?;
|
) = result?;
|
||||||
|
|
||||||
|
// merge facet_exists_docids hashmaps and send them as a typed chunk
|
||||||
|
{
|
||||||
|
let lmdb_writer_sx = lmdb_writer_sx.clone();
|
||||||
|
rayon::spawn(move || {
|
||||||
|
let mut all = BTreeMap::default();
|
||||||
|
for facet_exists_docids in facet_exists_docids {
|
||||||
|
for (field_id, docids) in facet_exists_docids {
|
||||||
|
let docids0 = all.entry(field_id).or_default();
|
||||||
|
*docids0 |= docids;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(all)));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
|
||||||
docid_word_positions_chunks.clone(),
|
docid_word_positions_chunks.clone(),
|
||||||
indexer.clone(),
|
indexer.clone(),
|
||||||
@ -141,12 +154,6 @@ pub(crate) fn data_from_obkv_documents(
|
|||||||
"field-id-facet-number-docids",
|
"field-id-facet-number-docids",
|
||||||
);
|
);
|
||||||
|
|
||||||
// spawn extraction task for field-id-facet-exists-docids
|
|
||||||
rayon::spawn(move || {
|
|
||||||
let reader = docid_fid_facet_exists_chunks.merge(merge_cbo_roaring_bitmaps, &indexer);
|
|
||||||
let _ = lmdb_writer_sx.send(reader.map(TypedChunk::FieldIdFacetExistsDocids));
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -221,7 +228,7 @@ fn send_and_extract_flattened_documents_data(
|
|||||||
grenad::Reader<CursorClonableMmap>,
|
grenad::Reader<CursorClonableMmap>,
|
||||||
(
|
(
|
||||||
grenad::Reader<CursorClonableMmap>,
|
grenad::Reader<CursorClonableMmap>,
|
||||||
(grenad::Reader<CursorClonableMmap>, grenad::Reader<File>),
|
(grenad::Reader<CursorClonableMmap>, BTreeMap<FieldId, RoaringBitmap>),
|
||||||
),
|
),
|
||||||
)> {
|
)> {
|
||||||
let flattened_documents_chunk =
|
let flattened_documents_chunk =
|
||||||
@ -266,7 +273,7 @@ fn send_and_extract_flattened_documents_data(
|
|||||||
let (
|
let (
|
||||||
docid_fid_facet_numbers_chunk,
|
docid_fid_facet_numbers_chunk,
|
||||||
docid_fid_facet_strings_chunk,
|
docid_fid_facet_strings_chunk,
|
||||||
docid_fid_facet_exists_chunk,
|
facet_exists_docids,
|
||||||
) = extract_fid_docid_facet_values(
|
) = extract_fid_docid_facet_values(
|
||||||
flattened_documents_chunk.clone(),
|
flattened_documents_chunk.clone(),
|
||||||
indexer.clone(),
|
indexer.clone(),
|
||||||
@ -291,7 +298,7 @@ fn send_and_extract_flattened_documents_data(
|
|||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
docid_fid_facet_numbers_chunk,
|
docid_fid_facet_numbers_chunk,
|
||||||
(docid_fid_facet_strings_chunk, docid_fid_facet_exists_chunk),
|
(docid_fid_facet_strings_chunk, facet_exists_docids),
|
||||||
))
|
))
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
@ -1,11 +1,12 @@
|
|||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use grenad::MergerBuilder;
|
use grenad::MergerBuilder;
|
||||||
use heed::types::ByteSlice;
|
use heed::types::ByteSlice;
|
||||||
use heed::{BytesDecode, RwTxn};
|
use heed::{BytesDecode, BytesEncode, RwTxn};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
@ -16,8 +17,8 @@ use super::{ClonableMmap, MergeFn};
|
|||||||
use crate::heed_codec::facet::{decode_prefix_string, encode_prefix_string};
|
use crate::heed_codec::facet::{decode_prefix_string, encode_prefix_string};
|
||||||
use crate::update::index_documents::helpers::as_cloneable_grenad;
|
use crate::update::index_documents::helpers::as_cloneable_grenad;
|
||||||
use crate::{
|
use crate::{
|
||||||
lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index,
|
error, lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, FieldId,
|
||||||
Result,
|
GeoPoint, Index, Result, BEU16,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(crate) enum TypedChunk {
|
pub(crate) enum TypedChunk {
|
||||||
@ -35,7 +36,7 @@ pub(crate) enum TypedChunk {
|
|||||||
WordPairProximityDocids(grenad::Reader<File>),
|
WordPairProximityDocids(grenad::Reader<File>),
|
||||||
FieldIdFacetStringDocids(grenad::Reader<File>),
|
FieldIdFacetStringDocids(grenad::Reader<File>),
|
||||||
FieldIdFacetNumberDocids(grenad::Reader<File>),
|
FieldIdFacetNumberDocids(grenad::Reader<File>),
|
||||||
FieldIdFacetExistsDocids(grenad::Reader<File>),
|
FieldIdFacetExistsDocids(BTreeMap<FieldId, RoaringBitmap>),
|
||||||
GeoPoints(grenad::Reader<File>),
|
GeoPoints(grenad::Reader<File>),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,16 +148,14 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
)?;
|
)?;
|
||||||
is_merged_database = true;
|
is_merged_database = true;
|
||||||
}
|
}
|
||||||
TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids_iter) => {
|
TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => {
|
||||||
append_entries_into_database(
|
write_sorted_iterator_into_database(
|
||||||
facet_id_exists_docids_iter,
|
facet_id_exists_docids.into_iter().map(|(k, v)| (BEU16::new(k), v)),
|
||||||
&index.facet_id_exists_docids,
|
&index.facet_id_exists_docids,
|
||||||
|
"facet-id-exists-docids",
|
||||||
wtxn,
|
wtxn,
|
||||||
index_is_empty,
|
|
||||||
|value, _buffer| Ok(value),
|
|
||||||
merge_cbo_roaring_bitmaps,
|
merge_cbo_roaring_bitmaps,
|
||||||
)
|
)?;
|
||||||
.unwrap();
|
|
||||||
is_merged_database = true;
|
is_merged_database = true;
|
||||||
}
|
}
|
||||||
TypedChunk::WordPairProximityDocids(word_pair_proximity_docids_iter) => {
|
TypedChunk::WordPairProximityDocids(word_pair_proximity_docids_iter) => {
|
||||||
@ -270,6 +269,58 @@ fn merge_cbo_roaring_bitmaps(
|
|||||||
)?)
|
)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn write_sorted_iterator_into_database<Iter, Key, Value, KeyCodec, ValueCodec, Merge>(
|
||||||
|
mut iterator: Iter,
|
||||||
|
database: &heed::Database<KeyCodec, ValueCodec>,
|
||||||
|
database_name: &'static str,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
merge_values: Merge,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
for<'a> KeyCodec: BytesEncode<'a, EItem = Key>,
|
||||||
|
for<'a> ValueCodec: BytesEncode<'a, EItem = Value> + BytesDecode<'a, DItem = Value>,
|
||||||
|
Iter: Iterator<Item = (Key, Value)>,
|
||||||
|
Merge: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
|
||||||
|
{
|
||||||
|
if database.is_empty(wtxn)? {
|
||||||
|
let mut database = database.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
|
||||||
|
|
||||||
|
while let Some((key, value)) = iterator.next() {
|
||||||
|
let key = KeyCodec::bytes_encode(&key)
|
||||||
|
.ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?;
|
||||||
|
if valid_lmdb_key(&key) {
|
||||||
|
let value = ValueCodec::bytes_encode(&value)
|
||||||
|
.ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?;
|
||||||
|
unsafe { database.append(&key, &value)? };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
let database = database.remap_types::<ByteSlice, ByteSlice>();
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
while let Some((key, value)) = iterator.next() {
|
||||||
|
let key = KeyCodec::bytes_encode(&key)
|
||||||
|
.ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?;
|
||||||
|
if valid_lmdb_key(&key) {
|
||||||
|
let value = ValueCodec::bytes_encode(&value)
|
||||||
|
.ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?;
|
||||||
|
let value = match database.get(wtxn, &key)? {
|
||||||
|
Some(prev_value) => {
|
||||||
|
merge_values(&value, &prev_value, &mut buffer)?;
|
||||||
|
&buffer[..]
|
||||||
|
}
|
||||||
|
None => &value,
|
||||||
|
};
|
||||||
|
|
||||||
|
database.put(wtxn, &key, value)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Write provided entries in database using serialize_value function.
|
/// Write provided entries in database using serialize_value function.
|
||||||
/// merge_values function is used if an entry already exist in the database.
|
/// merge_values function is used if an entry already exist in the database.
|
||||||
fn write_entries_into_database<R, K, V, FS, FM>(
|
fn write_entries_into_database<R, K, V, FS, FM>(
|
||||||
|
Loading…
Reference in New Issue
Block a user