2021-08-16 13:36:30 +02:00
|
|
|
mod extract_docid_word_positions;
|
|
|
|
mod extract_facet_number_docids;
|
|
|
|
mod extract_facet_string_docids;
|
|
|
|
mod extract_fid_docid_facet_values;
|
|
|
|
mod extract_fid_word_count_docids;
|
2021-08-23 18:41:48 +02:00
|
|
|
mod extract_geo_points;
|
2023-06-08 11:35:36 +02:00
|
|
|
mod extract_vector_points;
|
2021-08-16 13:36:30 +02:00
|
|
|
mod extract_word_docids;
|
|
|
|
mod extract_word_pair_proximity_docids;
|
2021-10-05 11:18:42 +02:00
|
|
|
mod extract_word_position_docids;
|
2021-08-16 13:36:30 +02:00
|
|
|
|
2022-07-19 14:42:35 +02:00
|
|
|
use std::collections::HashSet;
|
2021-08-16 13:36:30 +02:00
|
|
|
use std::fs::File;
|
2023-09-28 16:26:01 +02:00
|
|
|
use std::io::BufReader;
|
2021-08-16 13:36:30 +02:00
|
|
|
|
|
|
|
use crossbeam_channel::Sender;
|
|
|
|
use rayon::prelude::*;
|
2024-02-08 10:14:50 +01:00
|
|
|
use tracing::debug;
|
2021-08-16 13:36:30 +02:00
|
|
|
|
|
|
|
use self::extract_docid_word_positions::extract_docid_word_positions;
|
|
|
|
use self::extract_facet_number_docids::extract_facet_number_docids;
|
|
|
|
use self::extract_facet_string_docids::extract_facet_string_docids;
|
2023-03-09 13:21:21 +01:00
|
|
|
use self::extract_fid_docid_facet_values::{extract_fid_docid_facet_values, ExtractedFacetValues};
|
2021-08-16 13:36:30 +02:00
|
|
|
use self::extract_fid_word_count_docids::extract_fid_word_count_docids;
|
2021-08-23 18:41:48 +02:00
|
|
|
use self::extract_geo_points::extract_geo_points;
|
2023-11-15 15:46:37 +01:00
|
|
|
use self::extract_vector_points::{
|
|
|
|
extract_embeddings, extract_vector_points, ExtractedVectorPoints,
|
|
|
|
};
|
2021-08-16 13:36:30 +02:00
|
|
|
use self::extract_word_docids::extract_word_docids;
|
|
|
|
use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids;
|
2023-04-05 14:55:02 +02:00
|
|
|
use self::extract_word_position_docids::extract_word_position_docids;
|
2021-08-16 13:36:30 +02:00
|
|
|
use super::helpers::{
|
2023-10-25 14:15:06 +02:00
|
|
|
as_cloneable_grenad, merge_deladd_cbo_roaring_bitmaps, CursorClonableMmap, GrenadParameters,
|
|
|
|
MergeFn, MergeableReader,
|
2021-08-16 13:36:30 +02:00
|
|
|
};
|
|
|
|
use super::{helpers, TypedChunk};
|
2023-12-06 15:49:02 +01:00
|
|
|
use crate::proximity::ProximityPrecision;
|
2023-12-13 15:38:44 +01:00
|
|
|
use crate::vector::EmbeddingConfigs;
|
2023-11-15 15:46:37 +01:00
|
|
|
use crate::{FieldId, FieldsIdsMap, Result};
|
2021-08-16 13:36:30 +02:00
|
|
|
|
|
|
|
/// Extract data for each databases from obkv documents in parallel.
|
|
|
|
/// Send data in grenad file over provided Sender.
|
2022-10-14 16:44:10 +02:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2024-01-23 09:42:48 +01:00
|
|
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
|
2021-08-16 13:36:30 +02:00
|
|
|
pub(crate) fn data_from_obkv_documents(
|
2023-09-28 16:26:01 +02:00
|
|
|
original_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send,
|
|
|
|
flattened_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send,
|
2021-08-16 13:36:30 +02:00
|
|
|
indexer: GrenadParameters,
|
2021-08-24 13:01:31 +02:00
|
|
|
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
2021-08-16 13:36:30 +02:00
|
|
|
searchable_fields: Option<HashSet<FieldId>>,
|
|
|
|
faceted_fields: HashSet<FieldId>,
|
2021-09-02 15:57:40 +02:00
|
|
|
primary_key_id: FieldId,
|
2022-03-23 17:28:41 +01:00
|
|
|
geo_fields_ids: Option<(FieldId, FieldId)>,
|
2023-11-15 15:46:37 +01:00
|
|
|
field_id_map: FieldsIdsMap,
|
2021-08-17 12:25:07 +02:00
|
|
|
stop_words: Option<fst::Set<&[u8]>>,
|
2023-08-10 10:44:07 +02:00
|
|
|
allowed_separators: Option<&[&str]>,
|
|
|
|
dictionary: Option<&[&str]>,
|
2021-10-06 12:11:07 +02:00
|
|
|
max_positions_per_attributes: Option<u32>,
|
2022-03-24 17:00:29 +01:00
|
|
|
exact_attributes: HashSet<FieldId>,
|
2023-12-06 15:49:02 +01:00
|
|
|
proximity_precision: ProximityPrecision,
|
2023-12-13 15:38:44 +01:00
|
|
|
embedders: EmbeddingConfigs,
|
2021-08-16 13:36:30 +02:00
|
|
|
) -> Result<()> {
|
2023-07-10 18:41:54 +02:00
|
|
|
puffin::profile_function!();
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
original_obkv_chunks
|
2021-08-16 13:36:30 +02:00
|
|
|
.par_bridge()
|
2022-03-23 17:28:41 +01:00
|
|
|
.map(|original_documents_chunk| {
|
2023-09-06 12:20:25 +02:00
|
|
|
send_original_documents_data(
|
|
|
|
original_documents_chunk,
|
|
|
|
indexer,
|
|
|
|
lmdb_writer_sx.clone(),
|
2023-11-15 15:46:37 +01:00
|
|
|
field_id_map.clone(),
|
|
|
|
embedders.clone(),
|
2023-09-06 12:20:25 +02:00
|
|
|
)
|
2022-03-23 17:28:41 +01:00
|
|
|
})
|
|
|
|
.collect::<Result<()>>()?;
|
|
|
|
|
2022-10-14 16:44:10 +02:00
|
|
|
#[allow(clippy::type_complexity)]
|
2023-03-14 18:08:12 +01:00
|
|
|
let result: Result<(Vec<_>, (Vec<_>, (Vec<_>, (Vec<_>, (Vec<_>, Vec<_>)))))> =
|
|
|
|
flattened_obkv_chunks
|
|
|
|
.par_bridge()
|
|
|
|
.map(|flattened_obkv_chunks| {
|
|
|
|
send_and_extract_flattened_documents_data(
|
|
|
|
flattened_obkv_chunks,
|
|
|
|
indexer,
|
|
|
|
lmdb_writer_sx.clone(),
|
|
|
|
&searchable_fields,
|
|
|
|
&faceted_fields,
|
|
|
|
primary_key_id,
|
|
|
|
geo_fields_ids,
|
|
|
|
&stop_words,
|
2023-07-24 18:35:20 +02:00
|
|
|
&allowed_separators,
|
|
|
|
&dictionary,
|
2023-03-14 18:08:12 +01:00
|
|
|
max_positions_per_attributes,
|
|
|
|
)
|
|
|
|
})
|
|
|
|
.collect();
|
2021-08-16 13:36:30 +02:00
|
|
|
|
|
|
|
let (
|
2021-08-25 16:59:38 +02:00
|
|
|
docid_word_positions_chunks,
|
2022-07-19 14:42:35 +02:00
|
|
|
(
|
2023-10-18 13:53:58 +02:00
|
|
|
fid_docid_facet_numbers_chunks,
|
2023-03-08 16:46:42 +01:00
|
|
|
(
|
2023-10-18 13:53:58 +02:00
|
|
|
fid_docid_facet_strings_chunks,
|
2023-03-14 18:08:12 +01:00
|
|
|
(
|
|
|
|
facet_is_null_docids_chunks,
|
|
|
|
(facet_is_empty_docids_chunks, facet_exists_docids_chunks),
|
|
|
|
),
|
2023-03-08 16:46:42 +01:00
|
|
|
),
|
2022-07-19 14:42:35 +02:00
|
|
|
),
|
2021-08-16 13:36:30 +02:00
|
|
|
) = result?;
|
|
|
|
|
2022-07-19 14:42:35 +02:00
|
|
|
// merge facet_exists_docids and send them as a typed chunk
|
2022-07-19 09:57:28 +02:00
|
|
|
{
|
|
|
|
let lmdb_writer_sx = lmdb_writer_sx.clone();
|
|
|
|
rayon::spawn(move || {
|
2024-02-08 10:14:50 +01:00
|
|
|
debug!(database = "facet-id-exists-docids", "merge");
|
2023-10-25 14:15:06 +02:00
|
|
|
match facet_exists_docids_chunks.merge(merge_deladd_cbo_roaring_bitmaps, &indexer) {
|
2022-07-19 14:42:35 +02:00
|
|
|
Ok(reader) => {
|
|
|
|
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(reader)));
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
let _ = lmdb_writer_sx.send(Err(e));
|
2022-07-19 09:57:28 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2023-03-08 16:49:53 +01:00
|
|
|
// merge facet_is_null_docids and send them as a typed chunk
|
|
|
|
{
|
|
|
|
let lmdb_writer_sx = lmdb_writer_sx.clone();
|
|
|
|
rayon::spawn(move || {
|
2024-02-08 10:14:50 +01:00
|
|
|
debug!(database = "facet-id-is-null-docids", "merge");
|
2023-10-25 14:15:06 +02:00
|
|
|
match facet_is_null_docids_chunks.merge(merge_deladd_cbo_roaring_bitmaps, &indexer) {
|
2023-03-08 16:49:53 +01:00
|
|
|
Ok(reader) => {
|
|
|
|
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetIsNullDocids(reader)));
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
let _ = lmdb_writer_sx.send(Err(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2023-03-14 18:08:12 +01:00
|
|
|
// merge facet_is_empty_docids and send them as a typed chunk
|
|
|
|
{
|
|
|
|
let lmdb_writer_sx = lmdb_writer_sx.clone();
|
|
|
|
rayon::spawn(move || {
|
2024-02-08 10:14:50 +01:00
|
|
|
debug!(database = "facet-id-is-empty-docids", "merge");
|
2023-10-25 14:15:06 +02:00
|
|
|
match facet_is_empty_docids_chunks.merge(merge_deladd_cbo_roaring_bitmaps, &indexer) {
|
2023-03-14 18:08:12 +01:00
|
|
|
Ok(reader) => {
|
|
|
|
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetIsEmptyDocids(reader)));
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
let _ = lmdb_writer_sx.send(Err(e));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2023-12-14 16:31:00 +01:00
|
|
|
if proximity_precision == ProximityPrecision::ByWord {
|
2023-12-06 15:49:02 +01:00
|
|
|
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
|
|
|
docid_word_positions_chunks.clone(),
|
|
|
|
indexer,
|
|
|
|
lmdb_writer_sx.clone(),
|
|
|
|
extract_word_pair_proximity_docids,
|
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
|
|
|
TypedChunk::WordPairProximityDocids,
|
|
|
|
"word-pair-proximity-docids",
|
|
|
|
);
|
|
|
|
}
|
2021-08-16 13:36:30 +02:00
|
|
|
|
2023-09-28 16:26:01 +02:00
|
|
|
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
2021-08-16 13:36:30 +02:00
|
|
|
docid_word_positions_chunks.clone(),
|
2022-10-13 22:02:54 +02:00
|
|
|
indexer,
|
2021-08-16 13:36:30 +02:00
|
|
|
lmdb_writer_sx.clone(),
|
|
|
|
extract_fid_word_count_docids,
|
2023-10-25 14:15:06 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2023-10-19 10:38:58 +02:00
|
|
|
TypedChunk::FieldIdWordCountDocids,
|
2021-08-16 13:36:30 +02:00
|
|
|
"field-id-wordcount-docids",
|
|
|
|
);
|
|
|
|
|
2023-09-28 16:26:01 +02:00
|
|
|
spawn_extraction_task::<
|
|
|
|
_,
|
|
|
|
_,
|
2023-09-18 09:59:38 +02:00
|
|
|
Vec<(
|
|
|
|
grenad::Reader<BufReader<File>>,
|
|
|
|
grenad::Reader<BufReader<File>>,
|
|
|
|
grenad::Reader<BufReader<File>>,
|
|
|
|
)>,
|
2023-09-28 16:26:01 +02:00
|
|
|
>(
|
2021-08-16 13:36:30 +02:00
|
|
|
docid_word_positions_chunks.clone(),
|
2022-10-13 22:02:54 +02:00
|
|
|
indexer,
|
2021-08-16 13:36:30 +02:00
|
|
|
lmdb_writer_sx.clone(),
|
2022-03-24 17:00:29 +01:00
|
|
|
move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes),
|
2023-10-25 14:15:06 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2023-09-18 09:59:38 +02:00
|
|
|
|(word_docids_reader, exact_word_docids_reader, word_fid_docids_reader)| {
|
|
|
|
TypedChunk::WordDocids {
|
|
|
|
word_docids_reader,
|
|
|
|
exact_word_docids_reader,
|
|
|
|
word_fid_docids_reader,
|
|
|
|
}
|
2022-03-24 15:22:57 +01:00
|
|
|
},
|
2021-08-16 13:36:30 +02:00
|
|
|
"word-docids",
|
|
|
|
);
|
|
|
|
|
2023-09-28 16:26:01 +02:00
|
|
|
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
2023-04-05 14:55:02 +02:00
|
|
|
docid_word_positions_chunks.clone(),
|
2022-10-13 22:02:54 +02:00
|
|
|
indexer,
|
2021-08-16 13:36:30 +02:00
|
|
|
lmdb_writer_sx.clone(),
|
2023-04-05 14:55:02 +02:00
|
|
|
extract_word_position_docids,
|
2023-10-25 14:15:06 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2021-10-05 11:18:42 +02:00
|
|
|
TypedChunk::WordPositionDocids,
|
|
|
|
"word-position-docids",
|
2021-08-16 13:36:30 +02:00
|
|
|
);
|
|
|
|
|
2023-09-28 16:26:01 +02:00
|
|
|
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
2023-10-18 13:53:58 +02:00
|
|
|
fid_docid_facet_strings_chunks,
|
2022-10-13 22:02:54 +02:00
|
|
|
indexer,
|
2021-08-16 13:36:30 +02:00
|
|
|
lmdb_writer_sx.clone(),
|
|
|
|
extract_facet_string_docids,
|
2023-10-25 14:15:06 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2021-08-16 13:36:30 +02:00
|
|
|
TypedChunk::FieldIdFacetStringDocids,
|
|
|
|
"field-id-facet-string-docids",
|
|
|
|
);
|
|
|
|
|
2023-09-28 16:26:01 +02:00
|
|
|
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
2023-10-18 13:53:58 +02:00
|
|
|
fid_docid_facet_numbers_chunks,
|
2022-10-13 22:02:54 +02:00
|
|
|
indexer,
|
2022-10-24 21:34:13 +02:00
|
|
|
lmdb_writer_sx,
|
2021-08-16 13:36:30 +02:00
|
|
|
extract_facet_number_docids,
|
2023-10-25 14:15:06 +02:00
|
|
|
merge_deladd_cbo_roaring_bitmaps,
|
2021-08-16 13:36:30 +02:00
|
|
|
TypedChunk::FieldIdFacetNumberDocids,
|
2024-02-08 14:55:36 +01:00
|
|
|
"field-id-facet-number-docids",
|
2021-08-16 13:36:30 +02:00
|
|
|
);
|
2022-06-16 08:24:16 +02:00
|
|
|
|
2021-08-16 13:36:30 +02:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Spawn a new task to extract data for a specific DB using extract_fn.
|
|
|
|
/// Generated grenad chunks are merged using the merge_fn.
|
|
|
|
/// The result of merged chunks is serialized as TypedChunk using the serialize_fn
|
|
|
|
/// and sent into lmdb_writer_sx.
|
2022-03-23 14:48:15 +01:00
|
|
|
fn spawn_extraction_task<FE, FS, M>(
|
2021-08-16 13:36:30 +02:00
|
|
|
chunks: Vec<grenad::Reader<CursorClonableMmap>>,
|
|
|
|
indexer: GrenadParameters,
|
2021-08-24 13:01:31 +02:00
|
|
|
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
2021-08-16 13:36:30 +02:00
|
|
|
extract_fn: FE,
|
|
|
|
merge_fn: MergeFn,
|
|
|
|
serialize_fn: FS,
|
|
|
|
name: &'static str,
|
|
|
|
) where
|
2022-03-23 14:48:15 +01:00
|
|
|
FE: Fn(grenad::Reader<CursorClonableMmap>, GrenadParameters) -> Result<M::Output>
|
2021-08-16 13:36:30 +02:00
|
|
|
+ Sync
|
|
|
|
+ Send
|
|
|
|
+ 'static,
|
2022-03-23 14:48:15 +01:00
|
|
|
FS: Fn(M::Output) -> TypedChunk + Sync + Send + 'static,
|
|
|
|
M: MergeableReader + FromParallelIterator<M::Output> + Send + 'static,
|
|
|
|
M::Output: Send,
|
2021-08-16 13:36:30 +02:00
|
|
|
{
|
2024-01-23 09:42:48 +01:00
|
|
|
let current_span = tracing::Span::current();
|
|
|
|
|
2021-08-16 13:36:30 +02:00
|
|
|
rayon::spawn(move || {
|
2024-02-08 10:14:50 +01:00
|
|
|
let child_span =
|
|
|
|
tracing::trace_span!(target: "", parent: ¤t_span, "extract_multiple_chunks");
|
2024-01-23 09:42:48 +01:00
|
|
|
let _entered = child_span.enter();
|
2024-02-08 10:14:50 +01:00
|
|
|
puffin::profile_scope!("extract_multiple_chunksdexing::details, ", name);
|
2022-03-23 14:48:15 +01:00
|
|
|
let chunks: Result<M> =
|
2022-10-13 22:02:54 +02:00
|
|
|
chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer)).collect();
|
2024-01-23 09:42:48 +01:00
|
|
|
let current_span = tracing::Span::current();
|
|
|
|
|
2021-08-24 13:01:31 +02:00
|
|
|
rayon::spawn(move || match chunks {
|
|
|
|
Ok(chunks) => {
|
2024-02-08 10:14:50 +01:00
|
|
|
let child_span = tracing::trace_span!(target: "", parent: ¤t_span, "merge_multiple_chunks");
|
2024-01-23 09:42:48 +01:00
|
|
|
let _entered = child_span.enter();
|
2024-02-08 10:14:50 +01:00
|
|
|
debug!(database = name, "merge");
|
2023-07-10 18:41:54 +02:00
|
|
|
puffin::profile_scope!("merge_multiple_chunks", name);
|
2022-03-23 14:48:15 +01:00
|
|
|
let reader = chunks.merge(merge_fn, &indexer);
|
2022-10-13 22:02:54 +02:00
|
|
|
let _ = lmdb_writer_sx.send(reader.map(serialize_fn));
|
2021-08-26 11:01:30 +02:00
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
let _ = lmdb_writer_sx.send(Err(e));
|
2021-08-24 13:01:31 +02:00
|
|
|
}
|
|
|
|
})
|
2021-08-16 13:36:30 +02:00
|
|
|
});
|
|
|
|
}
|
2021-08-24 13:01:31 +02:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
/// Extract chunked data and send it into lmdb_writer_sx sender:
|
2021-08-24 13:01:31 +02:00
|
|
|
/// - documents
|
2022-03-23 17:28:41 +01:00
|
|
|
fn send_original_documents_data(
|
2023-09-28 16:26:01 +02:00
|
|
|
original_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
|
2023-09-06 12:20:25 +02:00
|
|
|
indexer: GrenadParameters,
|
2022-03-23 17:28:41 +01:00
|
|
|
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
2023-11-15 15:46:37 +01:00
|
|
|
field_id_map: FieldsIdsMap,
|
2023-12-13 15:38:44 +01:00
|
|
|
embedders: EmbeddingConfigs,
|
2022-03-23 17:28:41 +01:00
|
|
|
) -> Result<()> {
|
|
|
|
let original_documents_chunk =
|
|
|
|
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
|
|
|
|
|
2023-11-15 15:46:37 +01:00
|
|
|
let documents_chunk_cloned = original_documents_chunk.clone();
|
|
|
|
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
|
|
|
rayon::spawn(move || {
|
2023-12-12 21:19:48 +01:00
|
|
|
for (name, (embedder, prompt)) in embedders {
|
|
|
|
let result = extract_vector_points(
|
|
|
|
documents_chunk_cloned.clone(),
|
|
|
|
indexer,
|
|
|
|
&field_id_map,
|
|
|
|
&prompt,
|
2023-12-12 23:39:01 +01:00
|
|
|
&name,
|
2023-12-12 21:19:48 +01:00
|
|
|
);
|
|
|
|
match result {
|
|
|
|
Ok(ExtractedVectorPoints { manual_vectors, remove_vectors, prompts }) => {
|
|
|
|
let embeddings = match extract_embeddings(prompts, indexer, embedder.clone()) {
|
2023-11-15 15:46:37 +01:00
|
|
|
Ok(results) => Some(results),
|
|
|
|
Err(error) => {
|
|
|
|
let _ = lmdb_writer_sx_cloned.send(Err(error));
|
|
|
|
None
|
|
|
|
}
|
2023-12-12 21:19:48 +01:00
|
|
|
};
|
|
|
|
|
|
|
|
if !(remove_vectors.is_empty()
|
|
|
|
&& manual_vectors.is_empty()
|
|
|
|
&& embeddings.as_ref().map_or(true, |e| e.is_empty()))
|
|
|
|
{
|
2023-12-07 17:35:45 +01:00
|
|
|
let _ = lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints {
|
|
|
|
remove_vectors,
|
|
|
|
embeddings,
|
2023-12-12 21:19:48 +01:00
|
|
|
expected_dimension: embedder.dimensions(),
|
2023-12-07 17:35:45 +01:00
|
|
|
manual_vectors,
|
2023-12-12 21:19:48 +01:00
|
|
|
embedder_name: name,
|
2023-12-07 17:35:45 +01:00
|
|
|
}));
|
|
|
|
}
|
2023-09-06 12:20:25 +02:00
|
|
|
}
|
2023-12-12 21:19:48 +01:00
|
|
|
|
|
|
|
Err(error) => {
|
|
|
|
let _ = lmdb_writer_sx_cloned.send(Err(error));
|
|
|
|
}
|
2023-12-07 17:35:45 +01:00
|
|
|
}
|
2023-12-12 21:19:48 +01:00
|
|
|
}
|
2023-11-15 15:46:37 +01:00
|
|
|
});
|
2023-09-06 12:20:25 +02:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
// TODO: create a custom internal error
|
|
|
|
lmdb_writer_sx.send(Ok(TypedChunk::Documents(original_documents_chunk))).unwrap();
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Extract chunked data and send it into lmdb_writer_sx sender:
|
2021-08-24 13:01:31 +02:00
|
|
|
/// - documents_ids
|
|
|
|
/// - docid_word_positions
|
|
|
|
/// - docid_fid_facet_numbers
|
|
|
|
/// - docid_fid_facet_strings
|
2022-07-19 09:30:19 +02:00
|
|
|
/// - docid_fid_facet_exists
|
2022-10-14 16:44:10 +02:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
|
|
#[allow(clippy::type_complexity)]
|
2022-03-23 17:28:41 +01:00
|
|
|
fn send_and_extract_flattened_documents_data(
|
2023-09-28 16:26:01 +02:00
|
|
|
flattened_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
|
2021-08-24 13:01:31 +02:00
|
|
|
indexer: GrenadParameters,
|
|
|
|
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
|
|
|
searchable_fields: &Option<HashSet<FieldId>>,
|
|
|
|
faceted_fields: &HashSet<FieldId>,
|
2021-09-02 15:57:40 +02:00
|
|
|
primary_key_id: FieldId,
|
2022-03-23 17:28:41 +01:00
|
|
|
geo_fields_ids: Option<(FieldId, FieldId)>,
|
2021-08-24 13:01:31 +02:00
|
|
|
stop_words: &Option<fst::Set<&[u8]>>,
|
2023-08-10 10:44:07 +02:00
|
|
|
allowed_separators: &Option<&[&str]>,
|
|
|
|
dictionary: &Option<&[&str]>,
|
2021-10-06 12:11:07 +02:00
|
|
|
max_positions_per_attributes: Option<u32>,
|
2021-08-24 13:01:31 +02:00
|
|
|
) -> Result<(
|
|
|
|
grenad::Reader<CursorClonableMmap>,
|
2022-07-19 09:30:19 +02:00
|
|
|
(
|
|
|
|
grenad::Reader<CursorClonableMmap>,
|
2023-03-14 18:08:12 +01:00
|
|
|
(
|
|
|
|
grenad::Reader<CursorClonableMmap>,
|
2023-09-28 16:26:01 +02:00
|
|
|
(
|
|
|
|
grenad::Reader<BufReader<File>>,
|
|
|
|
(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>),
|
|
|
|
),
|
2023-03-14 18:08:12 +01:00
|
|
|
),
|
2022-07-19 09:30:19 +02:00
|
|
|
),
|
2021-08-24 13:01:31 +02:00
|
|
|
)> {
|
2022-03-23 17:28:41 +01:00
|
|
|
let flattened_documents_chunk =
|
|
|
|
flattened_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
|
2021-08-24 13:01:31 +02:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
if let Some(geo_fields_ids) = geo_fields_ids {
|
|
|
|
let documents_chunk_cloned = flattened_documents_chunk.clone();
|
2021-08-30 15:47:11 +02:00
|
|
|
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
|
|
|
rayon::spawn(move || {
|
2021-09-09 12:20:08 +02:00
|
|
|
let result =
|
2022-03-23 17:28:41 +01:00
|
|
|
extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, geo_fields_ids);
|
2021-09-09 12:20:08 +02:00
|
|
|
let _ = match result {
|
2021-08-30 15:47:11 +02:00
|
|
|
Ok(geo_points) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoPoints(geo_points))),
|
|
|
|
Err(error) => lmdb_writer_sx_cloned.send(Err(error)),
|
|
|
|
};
|
|
|
|
});
|
|
|
|
}
|
2021-08-25 16:59:38 +02:00
|
|
|
|
2023-10-18 13:53:58 +02:00
|
|
|
let (docid_word_positions_chunk, fid_docid_facet_values_chunks): (Result<_>, Result<_>) =
|
2021-08-24 13:01:31 +02:00
|
|
|
rayon::join(
|
|
|
|
|| {
|
2023-11-06 10:31:14 +01:00
|
|
|
let (docid_word_positions_chunk, script_language_pair) =
|
2022-10-17 13:51:04 +02:00
|
|
|
extract_docid_word_positions(
|
|
|
|
flattened_documents_chunk.clone(),
|
2023-02-21 10:18:44 +01:00
|
|
|
indexer,
|
2022-10-17 13:51:04 +02:00
|
|
|
searchable_fields,
|
|
|
|
stop_words.as_ref(),
|
2023-08-10 10:44:07 +02:00
|
|
|
*allowed_separators,
|
|
|
|
*dictionary,
|
2022-10-17 13:51:04 +02:00
|
|
|
max_positions_per_attributes,
|
|
|
|
)?;
|
2021-08-24 13:01:31 +02:00
|
|
|
|
|
|
|
// send docid_word_positions_chunk to DB writer
|
|
|
|
let docid_word_positions_chunk =
|
2022-02-16 15:40:08 +01:00
|
|
|
unsafe { as_cloneable_grenad(&docid_word_positions_chunk)? };
|
2021-08-26 11:01:30 +02:00
|
|
|
|
2022-10-17 13:51:04 +02:00
|
|
|
let _ =
|
|
|
|
lmdb_writer_sx.send(Ok(TypedChunk::ScriptLanguageDocids(script_language_pair)));
|
2022-10-12 13:24:56 +02:00
|
|
|
|
2021-08-24 13:01:31 +02:00
|
|
|
Ok(docid_word_positions_chunk)
|
|
|
|
},
|
|
|
|
|| {
|
2023-03-09 13:21:21 +01:00
|
|
|
let ExtractedFacetValues {
|
2023-10-18 13:53:58 +02:00
|
|
|
fid_docid_facet_numbers_chunk,
|
|
|
|
fid_docid_facet_strings_chunk,
|
2023-03-08 16:46:42 +01:00
|
|
|
fid_facet_is_null_docids_chunk,
|
2023-03-14 18:08:12 +01:00
|
|
|
fid_facet_is_empty_docids_chunk,
|
2022-07-19 14:42:35 +02:00
|
|
|
fid_facet_exists_docids_chunk,
|
2023-03-09 13:21:21 +01:00
|
|
|
} = extract_fid_docid_facet_values(
|
2022-07-19 09:30:19 +02:00
|
|
|
flattened_documents_chunk.clone(),
|
2022-10-13 22:02:54 +02:00
|
|
|
indexer,
|
2022-07-19 09:30:19 +02:00
|
|
|
faceted_fields,
|
2023-08-08 16:28:07 +02:00
|
|
|
geo_fields_ids,
|
2022-07-19 09:30:19 +02:00
|
|
|
)?;
|
2021-08-24 13:01:31 +02:00
|
|
|
|
2023-10-18 13:53:58 +02:00
|
|
|
// send fid_docid_facet_numbers_chunk to DB writer
|
|
|
|
let fid_docid_facet_numbers_chunk =
|
|
|
|
unsafe { as_cloneable_grenad(&fid_docid_facet_numbers_chunk)? };
|
2021-08-26 11:01:30 +02:00
|
|
|
|
|
|
|
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetNumbers(
|
2023-10-18 13:53:58 +02:00
|
|
|
fid_docid_facet_numbers_chunk.clone(),
|
2021-08-26 11:01:30 +02:00
|
|
|
)));
|
2021-08-24 13:01:31 +02:00
|
|
|
|
2023-10-18 13:53:58 +02:00
|
|
|
// send fid_docid_facet_strings_chunk to DB writer
|
|
|
|
let fid_docid_facet_strings_chunk =
|
|
|
|
unsafe { as_cloneable_grenad(&fid_docid_facet_strings_chunk)? };
|
2021-08-26 11:01:30 +02:00
|
|
|
|
|
|
|
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetStrings(
|
2023-10-18 13:53:58 +02:00
|
|
|
fid_docid_facet_strings_chunk.clone(),
|
2021-08-26 11:01:30 +02:00
|
|
|
)));
|
2021-08-24 13:01:31 +02:00
|
|
|
|
2022-07-19 09:30:19 +02:00
|
|
|
Ok((
|
2023-10-18 13:53:58 +02:00
|
|
|
fid_docid_facet_numbers_chunk,
|
2023-03-08 16:46:42 +01:00
|
|
|
(
|
2023-10-18 13:53:58 +02:00
|
|
|
fid_docid_facet_strings_chunk,
|
2023-03-14 18:08:12 +01:00
|
|
|
(
|
|
|
|
fid_facet_is_null_docids_chunk,
|
|
|
|
(fid_facet_is_empty_docids_chunk, fid_facet_exists_docids_chunk),
|
|
|
|
),
|
2023-03-08 16:46:42 +01:00
|
|
|
),
|
2022-07-19 09:30:19 +02:00
|
|
|
))
|
2021-08-24 13:01:31 +02:00
|
|
|
},
|
|
|
|
);
|
2021-09-02 15:17:52 +02:00
|
|
|
|
2023-10-18 13:53:58 +02:00
|
|
|
Ok((docid_word_positions_chunk?, fid_docid_facet_values_chunks?))
|
2021-08-24 13:01:31 +02:00
|
|
|
}
|