Simplify indexing task for facet_exists_docids database

This commit is contained in:
Loïc Lecrenier 2022-06-16 08:24:16 +02:00
parent 392472f4bb
commit 30bd4db0fc
6 changed files with 50 additions and 105 deletions

View File

@ -0,0 +1,25 @@
use crate::{FieldId, BEU16};
use heed::zerocopy::AsBytes;
use std::{borrow::Cow, convert::TryInto};
pub struct FieldIdCodec;
impl<'a> heed::BytesDecode<'a> for FieldIdCodec {
type DItem = FieldId;
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let bytes: [u8; 2] = bytes[..2].try_into().ok()?;
let field_id = BEU16::from(bytes).get();
Some(field_id)
}
}
impl<'a> heed::BytesEncode<'a> for FieldIdCodec {
type EItem = FieldId;
fn bytes_encode(field_id: &Self::EItem) -> Option<Cow<[u8]>> {
let field_id = BEU16::new(*field_id);
let bytes = field_id.as_bytes();
Some(Cow::Owned(bytes.to_vec()))
}
}

View File

@ -5,6 +5,7 @@ mod facet_string_level_zero_value_codec;
mod facet_string_zero_bounds_value_codec; mod facet_string_zero_bounds_value_codec;
mod field_doc_id_facet_f64_codec; mod field_doc_id_facet_f64_codec;
mod field_doc_id_facet_string_codec; mod field_doc_id_facet_string_codec;
mod field_id_codec;
pub use self::facet_level_value_f64_codec::FacetLevelValueF64Codec; pub use self::facet_level_value_f64_codec::FacetLevelValueF64Codec;
pub use self::facet_level_value_u32_codec::FacetLevelValueU32Codec; pub use self::facet_level_value_u32_codec::FacetLevelValueU32Codec;
@ -15,6 +16,7 @@ pub use self::facet_string_level_zero_value_codec::{
pub use self::facet_string_zero_bounds_value_codec::FacetStringZeroBoundsValueCodec; pub use self::facet_string_zero_bounds_value_codec::FacetStringZeroBoundsValueCodec;
pub use self::field_doc_id_facet_f64_codec::FieldDocIdFacetF64Codec; pub use self::field_doc_id_facet_f64_codec::FieldDocIdFacetF64Codec;
pub use self::field_doc_id_facet_string_codec::FieldDocIdFacetStringCodec; pub use self::field_doc_id_facet_string_codec::FieldDocIdFacetStringCodec;
pub use self::field_id_codec::FieldIdCodec;
/// Tries to split a slice in half at the given middle point, /// Tries to split a slice in half at the given middle point,
/// `None` if the slice is too short. /// `None` if the slice is too short.
@ -25,44 +27,3 @@ pub fn try_split_at(slice: &[u8], mid: usize) -> Option<(&[u8], &[u8])> {
None None
} }
} }
use std::borrow::Cow;
use std::convert::TryInto;
use crate::{try_split_array_at, DocumentId, FieldId};
pub struct FieldIdCodec;
impl<'a> heed::BytesDecode<'a> for FieldIdCodec {
type DItem = FieldId;
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let (field_id_bytes, _) = try_split_array_at(bytes)?;
let field_id = u16::from_be_bytes(field_id_bytes);
Some(field_id)
}
}
impl<'a> heed::BytesEncode<'a> for FieldIdCodec {
type EItem = FieldId;
fn bytes_encode(field_id: &Self::EItem) -> Option<Cow<[u8]>> {
Some(Cow::Owned(field_id.to_be_bytes().to_vec()))
}
}
pub struct FieldIdDocIdCodec;
impl<'a> heed::BytesDecode<'a> for FieldIdDocIdCodec {
type DItem = (FieldId, DocumentId);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let (field_id_bytes, bytes) = try_split_array_at(bytes)?;
let field_id = u16::from_be_bytes(field_id_bytes);
let document_id_bytes = bytes[..4].try_into().ok()?;
let document_id = u32::from_be_bytes(document_id_bytes);
Some((field_id, document_id))
}
}

View File

@ -49,6 +49,7 @@ pub type SmallString32 = smallstr::SmallString<[u8; 32]>;
pub type SmallVec16<T> = smallvec::SmallVec<[T; 16]>; pub type SmallVec16<T> = smallvec::SmallVec<[T; 16]>;
pub type SmallVec32<T> = smallvec::SmallVec<[T; 32]>; pub type SmallVec32<T> = smallvec::SmallVec<[T; 32]>;
pub type SmallVec8<T> = smallvec::SmallVec<[T; 8]>; pub type SmallVec8<T> = smallvec::SmallVec<[T; 8]>;
pub type BEU16 = heed::zerocopy::U16<heed::byteorder::BE>;
pub type BEU32 = heed::zerocopy::U32<heed::byteorder::BE>; pub type BEU32 = heed::zerocopy::U32<heed::byteorder::BE>;
pub type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>; pub type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
pub type Attribute = u32; pub type Attribute = u32;

View File

@ -1,40 +0,0 @@
use std::fs::File;
use std::io;
use heed::{BytesDecode, BytesEncode};
use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters,
};
use crate::heed_codec::facet::{FieldIdCodec, FieldIdDocIdCodec};
use crate::Result;
/// Extracts the documents ids where this field appears.
///
/// Returns a grenad reader whose key is the field id encoded
/// with `FieldIdCodec` and the value is a document_id (u32)
/// encoded as native-endian bytes.
#[logging_timer::time]
pub fn extract_facet_exists_docids<R: io::Read + io::Seek>(
docid_fid_facet_number: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let max_memory = indexer.max_memory_by_thread();
let mut facet_exists_docids_sorter = create_sorter(
merge_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
);
let mut cursor = docid_fid_facet_number.into_cursor()?;
while let Some((key_bytes, _)) = cursor.move_on_next()? {
let (field_id, document_id) = FieldIdDocIdCodec::bytes_decode(key_bytes).unwrap();
let key_bytes = FieldIdCodec::bytes_encode(&field_id).unwrap();
facet_exists_docids_sorter.insert(key_bytes, document_id.to_ne_bytes())?;
}
sorter_into_reader(facet_exists_docids_sorter, indexer)
}

View File

@ -1,15 +1,16 @@
use heed::zerocopy::AsBytes;
use serde_json::Value;
use std::collections::HashSet; use std::collections::HashSet;
use std::convert::TryInto;
use std::fs::File; use std::fs::File;
use std::io; use std::io;
use std::mem::size_of; use std::mem::size_of;
use heed::zerocopy::AsBytes;
use serde_json::Value;
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
use crate::error::InternalError; use crate::error::InternalError;
use crate::facet::value_encoding::f64_into_bytes; use crate::facet::value_encoding::f64_into_bytes;
use crate::{DocumentId, FieldId, Result}; use crate::update::index_documents::merge_cbo_roaring_bitmaps;
use crate::{DocumentId, FieldId, Result, BEU32};
/// Extracts the facet values of each faceted field of each document. /// Extracts the facet values of each faceted field of each document.
/// ///
@ -40,7 +41,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
); );
let mut fid_docid_facet_exists_sorter = create_sorter( let mut fid_docid_facet_exists_sorter = create_sorter(
keep_first, merge_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -56,12 +57,17 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
if faceted_fields.contains(&field_id) { if faceted_fields.contains(&field_id) {
key_buffer.clear(); key_buffer.clear();
// here, we know already that the document must be added to the “field id exists” database // Set key to the field_id
// prefix key with the field_id and the document_id // Note: this encoding is consistent with FieldIdCodec
key_buffer.extend_from_slice(&field_id.to_be_bytes()); key_buffer.extend_from_slice(&field_id.to_be_bytes());
// Here, we know already that the document must be added to the “field id exists” database
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
let document = BEU32::from(document).get();
fid_docid_facet_exists_sorter.insert(&key_buffer, document.to_ne_bytes())?;
// For the other extraction tasks, prefix the key with the field_id and the document_id
key_buffer.extend_from_slice(&docid_bytes); key_buffer.extend_from_slice(&docid_bytes);
fid_docid_facet_exists_sorter.insert(&key_buffer, ().as_bytes())?;
let value = let value =
serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?; serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?;

View File

@ -1,5 +1,4 @@
mod extract_docid_word_positions; mod extract_docid_word_positions;
mod extract_facet_exists_docids;
mod extract_facet_number_docids; mod extract_facet_number_docids;
mod extract_facet_string_docids; mod extract_facet_string_docids;
mod extract_fid_docid_facet_values; mod extract_fid_docid_facet_values;
@ -17,7 +16,6 @@ use log::debug;
use rayon::prelude::*; use rayon::prelude::*;
use self::extract_docid_word_positions::extract_docid_word_positions; use self::extract_docid_word_positions::extract_docid_word_positions;
use self::extract_facet_exists_docids::extract_facet_exists_docids;
use self::extract_facet_number_docids::extract_facet_number_docids; use self::extract_facet_number_docids::extract_facet_number_docids;
use self::extract_facet_string_docids::extract_facet_string_docids; use self::extract_facet_string_docids::extract_facet_string_docids;
use self::extract_fid_docid_facet_values::extract_fid_docid_facet_values; use self::extract_fid_docid_facet_values::extract_fid_docid_facet_values;
@ -142,15 +140,12 @@ pub(crate) fn data_from_obkv_documents(
TypedChunk::FieldIdFacetNumberDocids, TypedChunk::FieldIdFacetNumberDocids,
"field-id-facet-number-docids", "field-id-facet-number-docids",
); );
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
docid_fid_facet_exists_chunks.clone(), // spawn extraction task for field-id-facet-exists-docids
indexer.clone(), rayon::spawn(move || {
lmdb_writer_sx.clone(), let reader = docid_fid_facet_exists_chunks.merge(merge_cbo_roaring_bitmaps, &indexer);
extract_facet_exists_docids, let _ = lmdb_writer_sx.send(reader.map(TypedChunk::FieldIdFacetExistsDocids));
merge_cbo_roaring_bitmaps, });
TypedChunk::FieldIdFacetExistsDocids,
"field-id-facet-exists-docids",
);
Ok(()) Ok(())
} }
@ -226,7 +221,7 @@ fn send_and_extract_flattened_documents_data(
grenad::Reader<CursorClonableMmap>, grenad::Reader<CursorClonableMmap>,
( (
grenad::Reader<CursorClonableMmap>, grenad::Reader<CursorClonableMmap>,
(grenad::Reader<CursorClonableMmap>, grenad::Reader<CursorClonableMmap>), (grenad::Reader<CursorClonableMmap>, grenad::Reader<File>),
), ),
)> { )> {
let flattened_documents_chunk = let flattened_documents_chunk =
@ -294,9 +289,6 @@ fn send_and_extract_flattened_documents_data(
docid_fid_facet_strings_chunk.clone(), docid_fid_facet_strings_chunk.clone(),
))); )));
let docid_fid_facet_exists_chunk =
unsafe { as_cloneable_grenad(&docid_fid_facet_exists_chunk)? };
Ok(( Ok((
docid_fid_facet_numbers_chunk, docid_fid_facet_numbers_chunk,
(docid_fid_facet_strings_chunk, docid_fid_facet_exists_chunk), (docid_fid_facet_strings_chunk, docid_fid_facet_exists_chunk),