Merge branch 'main' into tmp-release-v1.3.0

This commit is contained in:
ManyTheFish 2023-08-01 15:05:17 +02:00
commit b45c36cd71
37 changed files with 295 additions and 71 deletions

View file

@ -65,13 +65,16 @@ filter-parser = { path = "../filter-parser" }
# documents words self-join
itertools = "0.10.5"
# profiling
puffin = "0.16.0"
# logging
log = "0.4.17"
logging_timer = "1.1.0"
csv = "1.2.1"
[dev-dependencies]
mimalloc = { version = "0.1.29", default-features = false }
mimalloc = { version = "0.1.37", default-features = false }
big_s = "1.0.2"
insta = "1.29.0"
maplit = "1.0.2"

View file

@ -15,6 +15,8 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
}
pub fn execute(self) -> Result<u64> {
puffin::profile_function!();
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
let Index {
env: _env,

View file

@ -109,6 +109,8 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
Some(docid)
}
pub fn execute(self) -> Result<DocumentDeletionResult> {
puffin::profile_function!();
let DetailedDocumentDeletionResult { deleted_documents, remaining_documents } =
self.execute_inner()?;

View file

@ -31,6 +31,8 @@ pub fn enrich_documents_batch<R: Read + Seek>(
autogenerate_docids: bool,
reader: DocumentsBatchReader<R>,
) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> {
puffin::profile_function!();
let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index();
let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?;

View file

@ -30,6 +30,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
stop_words: Option<&fst::Set<&[u8]>>,
max_positions_per_attributes: Option<u32>,
) -> Result<(RoaringBitmap, grenad::Reader<File>, ScriptLanguageDocidsMap)> {
puffin::profile_function!();
let max_positions_per_attributes = max_positions_per_attributes
.map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE));
let max_memory = indexer.max_memory_by_thread();

View file

@ -20,6 +20,8 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
docid_fid_facet_number: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut facet_number_docids_sorter = create_sorter(

View file

@ -18,6 +18,8 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
docid_fid_facet_string: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut facet_string_docids_sorter = create_sorter(

View file

@ -34,6 +34,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
indexer: GrenadParameters,
faceted_fields: &HashSet<FieldId>,
) -> Result<ExtractedFacetValues> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut fid_docid_facet_numbers_sorter = create_sorter(

View file

@ -22,6 +22,8 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut fid_word_count_docids_sorter = create_sorter(

View file

@ -19,6 +19,8 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
primary_key_id: FieldId,
(lat_fid, lng_fid): (FieldId, FieldId),
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,

View file

@ -19,6 +19,8 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
primary_key_id: FieldId,
vectors_fid: FieldId,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,

View file

@ -27,6 +27,8 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
exact_attributes: &HashSet<FieldId>,
) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_docids_sorter = create_sorter(

View file

@ -15,6 +15,8 @@ pub fn extract_word_fid_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_fid_docids_sorter = create_sorter(

View file

@ -21,6 +21,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_pair_proximity_docids_sorter = create_sorter(

View file

@ -18,6 +18,8 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_position_docids_sorter = create_sorter(

View file

@ -52,6 +52,8 @@ pub(crate) fn data_from_obkv_documents(
max_positions_per_attributes: Option<u32>,
exact_attributes: HashSet<FieldId>,
) -> Result<()> {
puffin::profile_function!();
original_obkv_chunks
.par_bridge()
.map(|original_documents_chunk| {
@ -238,11 +240,13 @@ fn spawn_extraction_task<FE, FS, M>(
M::Output: Send,
{
rayon::spawn(move || {
puffin::profile_scope!("extract_multiple_chunks", name);
let chunks: Result<M> =
chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer)).collect();
rayon::spawn(move || match chunks {
Ok(chunks) => {
debug!("merge {} database", name);
puffin::profile_scope!("merge_multiple_chunks", name);
let reader = chunks.merge(merge_fn, &indexer);
let _ = lmdb_writer_sx.send(reader.map(serialize_fn));
}

View file

@ -214,6 +214,7 @@ pub fn sorter_into_lmdb_database(
sorter: Sorter<MergeFn>,
merge: MergeFn,
) -> Result<()> {
puffin::profile_function!();
debug!("Writing MTBL sorter...");
let before = Instant::now();

View file

@ -137,6 +137,8 @@ where
mut self,
reader: DocumentsBatchReader<R>,
) -> Result<(Self, StdResult<u64, UserError>)> {
puffin::profile_function!();
// Early return when there is no document to add
if reader.is_empty() {
return Ok((self, Ok(0)));
@ -175,6 +177,8 @@ where
mut self,
to_delete: Vec<String>,
) -> Result<(Self, StdResult<u64, UserError>)> {
puffin::profile_function!();
// Early return when there is no document to add
if to_delete.is_empty() {
return Ok((self, Ok(0)));
@ -194,6 +198,8 @@ where
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute(mut self) -> Result<DocumentAdditionResult> {
puffin::profile_function!();
if self.added_documents == 0 {
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents });
@ -232,6 +238,8 @@ where
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
let TransformOutput {
primary_key,
fields_ids_map,
@ -322,6 +330,7 @@ where
// Run extraction pipeline in parallel.
pool.install(|| {
puffin::profile_scope!("extract_and_send_grenad_chunks");
// split obkv file into several chunks
let original_chunk_iter =
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size);
@ -477,6 +486,8 @@ where
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
// Merged databases are already been indexed, we start from this count;
let mut databases_seen = MERGED_DATABASE_COUNT;
@ -511,26 +522,36 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
let current_prefix_fst;
let common_prefix_fst_words_tmp;
let common_prefix_fst_words: Vec<_>;
let new_prefix_fst_words;
let del_prefix_fst_words;
// We retrieve the common words between the previous and new prefix word fst.
let common_prefix_fst_words = fst_stream_into_vec(
previous_words_prefixes_fst.op().add(&current_prefix_fst).intersection(),
);
let common_prefix_fst_words: Vec<_> = common_prefix_fst_words
.as_slice()
.linear_group_by_key(|x| x.chars().next().unwrap())
.collect();
{
puffin::profile_scope!("compute_prefix_diffs");
// We retrieve the newly added words between the previous and new prefix word fst.
let new_prefix_fst_words = fst_stream_into_vec(
current_prefix_fst.op().add(&previous_words_prefixes_fst).difference(),
);
current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
// We compute the set of prefixes that are no more part of the prefix fst.
let del_prefix_fst_words = fst_stream_into_hashset(
previous_words_prefixes_fst.op().add(&current_prefix_fst).difference(),
);
// We retrieve the common words between the previous and new prefix word fst.
common_prefix_fst_words_tmp = fst_stream_into_vec(
previous_words_prefixes_fst.op().add(&current_prefix_fst).intersection(),
);
common_prefix_fst_words = common_prefix_fst_words_tmp
.as_slice()
.linear_group_by_key(|x| x.chars().next().unwrap())
.collect();
// We retrieve the newly added words between the previous and new prefix word fst.
new_prefix_fst_words = fst_stream_into_vec(
current_prefix_fst.op().add(&previous_words_prefixes_fst).difference(),
);
// We compute the set of prefixes that are no more part of the prefix fst.
del_prefix_fst_words = fst_stream_into_hashset(
previous_words_prefixes_fst.op().add(&current_prefix_fst).difference(),
);
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
@ -668,6 +689,8 @@ fn execute_word_prefix_docids(
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
let cursor = reader.into_cursor()?;
let mut builder = WordPrefixDocids::new(txn, word_docids_db, word_prefix_docids_db);
builder.chunk_compression_type = indexer_config.chunk_compression_type;

View file

@ -558,6 +558,8 @@ impl<'a, 'i> Transform<'a, 'i> {
where
F: Fn(UpdateIndexingStep) + Sync,
{
puffin::profile_function!();
let primary_key = self
.index
.primary_key(wtxn)?

View file

@ -46,6 +46,66 @@ pub(crate) enum TypedChunk {
ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>),
}
impl TypedChunk {
pub fn to_debug_string(&self) -> String {
match self {
TypedChunk::FieldIdDocidFacetStrings(grenad) => {
format!("FieldIdDocidFacetStrings {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdDocidFacetNumbers(grenad) => {
format!("FieldIdDocidFacetNumbers {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::Documents(grenad) => {
format!("Documents {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdWordcountDocids(grenad) => {
format!("FieldIdWordcountDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::NewDocumentsIds(grenad) => {
format!("NewDocumentsIds {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => format!(
"WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {} }}",
word_docids_reader.len(),
exact_word_docids_reader.len()
),
TypedChunk::WordPositionDocids(grenad) => {
format!("WordPositionDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordFidDocids(grenad) => {
format!("WordFidDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordPairProximityDocids(grenad) => {
format!("WordPairProximityDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetStringDocids(grenad) => {
format!("FieldIdFacetStringDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetNumberDocids(grenad) => {
format!("FieldIdFacetNumberDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetExistsDocids(grenad) => {
format!("FieldIdFacetExistsDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetIsNullDocids(grenad) => {
format!("FieldIdFacetIsNullDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetIsEmptyDocids(grenad) => {
format!("FieldIdFacetIsEmptyDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::GeoPoints(grenad) => {
format!("GeoPoints {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::VectorPoints(grenad) => {
format!("VectorPoints {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::ScriptLanguageDocids(grenad) => {
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", grenad.len())
}
}
}
}
/// Write typed chunk in the corresponding LMDB database of the provided index.
/// Return new documents seen.
pub(crate) fn write_typed_chunk_into_index(
@ -54,6 +114,8 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn: &mut RwTxn,
index_is_empty: bool,
) -> Result<(RoaringBitmap, bool)> {
puffin::profile_function!(typed_chunk.to_debug_string());
let mut is_merged_database = false;
match typed_chunk {
TypedChunk::Documents(obkv_documents_iter) => {
@ -350,6 +412,8 @@ where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
{
puffin::profile_function!(format!("number of entries: {}", data.len()));
let mut buffer = Vec::new();
let database = database.remap_types::<ByteSlice, ByteSlice>();
@ -392,6 +456,8 @@ where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
{
puffin::profile_function!(format!("number of entries: {}", data.len()));
if !index_is_empty {
return write_entries_into_database(
data,

View file

@ -50,6 +50,8 @@ impl<'t, 'u, 'i> PrefixWordPairsProximityDocids<'t, 'u, 'i> {
common_prefix_fst_words: &[&'a [String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
index_word_prefix_database(
self.wtxn,
self.index.word_pair_proximity_docids,

View file

@ -27,6 +27,8 @@ pub fn index_prefix_word_database(
chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>,
) -> Result<()> {
puffin::profile_function!();
let max_proximity = max_proximity - 1;
debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk...");

View file

@ -191,6 +191,7 @@ pub fn index_word_prefix_database(
chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>,
) -> Result<()> {
puffin::profile_function!();
debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk...");
// Make a prefix trie from the common prefixes that are shorter than self.max_prefix_length

View file

@ -303,6 +303,8 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
// if the settings are set before any document update, we don't need to do anything, and
// will set the primary key during the first document addition.

View file

@ -45,6 +45,8 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
// It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(

View file

@ -50,6 +50,7 @@ impl<'t, 'u, 'i> WordPrefixIntegerDocids<'t, 'u, 'i> {
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
debug!("Computing and writing the word levels integers docids into LMDB on disk...");
let mut prefix_integer_docids_sorter = create_sorter(

View file

@ -42,6 +42,8 @@ impl<'t, 'u, 'i> WordsPrefixesFst<'t, 'u, 'i> {
#[logging_timer::time("WordsPrefixesFst::{}")]
pub fn execute(self) -> Result<()> {
puffin::profile_function!();
let words_fst = self.index.words_fst(self.wtxn)?;
let mut current_prefix = vec![SmallString32::new(); self.max_prefix_length];