From 99d35fb9403befc55ab3c48eae8d60cadb8e2a4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 30 Dec 2019 14:37:31 +0100 Subject: [PATCH] Introduce a first version of a number of candidates reducer It works by ignoring the postings lists associated to documents that the previous words did not returned --- meilisearch-core/src/bucket_sort.rs | 22 ++++-- .../src/update/documents_addition.rs | 68 +++++++++++++------ meilisearch-core/src/update/mod.rs | 3 - 3 files changed, 67 insertions(+), 26 deletions(-) diff --git a/meilisearch-core/src/bucket_sort.rs b/meilisearch-core/src/bucket_sort.rs index 3d3f11587..8e820c71f 100644 --- a/meilisearch-core/src/bucket_sort.rs +++ b/meilisearch-core/src/bucket_sort.rs @@ -1,11 +1,12 @@ -use std::ops::Deref; -use std::{cmp, fmt}; use std::borrow::Cow; +use std::collections::HashSet; use std::mem; +use std::ops::Deref; use std::ops::Range; use std::rc::Rc; -use std::time::{Duration, Instant}; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; +use std::{cmp, fmt}; use compact_arena::{SmallArena, Idx32, mk_arena}; use fst::{IntoStreamer, Streamer}; @@ -496,6 +497,7 @@ fn fetch_matches<'txn, 'tag>( debug!("words fst len {} and size {}", words.len(), words.as_fst().as_bytes().len()); let mut total_postings_lists = Vec::new(); + let mut documents_ids = HashSet::::new(); let mut dfa_time = Duration::default(); let mut postings_lists_fetching_time = Duration::default(); @@ -509,6 +511,8 @@ fn fetch_matches<'txn, 'tag>( let mut stream_next_time = Duration::default(); let mut number_of_words = 0; + let mut postings_lists_original_length = 0; + let mut postings_lists_length = 0; let byte = query.as_bytes()[0]; let mut stream = if byte == u8::max_value() { @@ -535,14 +539,22 @@ fn fetch_matches<'txn, 'tag>( let before_postings_lists_fetching = Instant::now(); if let Some(postings_list) = postings_lists_store.postings_list(reader, input)? { + postings_lists_original_length += postings_list.len(); + let input = Rc::from(input); let postings_list = Rc::new(postings_list); let postings_list_view = PostingsListView::original(input, postings_list); let mut offset = 0; for group in postings_list_view.linear_group_by_key(|di| di.document_id) { - let posting_list_index = arena.add(postings_list_view.range(offset, group.len())); let document_id = group[0].document_id; + + if query_index != 0 && !documents_ids.contains(&document_id) { continue } + documents_ids.insert(document_id); + + postings_lists_length += group.len(); + + let posting_list_index = arena.add(postings_list_view.range(offset, group.len())); let bare_match = BareMatch { document_id, query_index: query_index as u16, @@ -559,6 +571,8 @@ fn fetch_matches<'txn, 'tag>( } debug!("{:?} gives {} words", query, number_of_words); + debug!("{:?} gives postings lists of length {} (original was {})", + query, postings_lists_length, postings_lists_original_length); debug!("stream next took {:.02?}", stream_next_time); } diff --git a/meilisearch-core/src/update/documents_addition.rs b/meilisearch-core/src/update/documents_addition.rs index 6a4733d01..c77ff012a 100644 --- a/meilisearch-core/src/update/documents_addition.rs +++ b/meilisearch-core/src/update/documents_addition.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::borrow::Cow; use fst::{set::OpBuilder, SetBuilder, IntoStreamer, Streamer}; -use sdset::{duo::Union, SetOperation, SetBuf}; +use sdset::{duo::Union, SetOperation, Set, SetBuf}; use serde::{Deserialize, Serialize}; use log::debug; @@ -196,36 +196,66 @@ pub fn apply_documents_addition<'a, 'b>( let pplc_store = prefix_postings_lists_cache_store; pplc_store.clear(writer)?; - const MAX_PREFIX_LENGTH: usize = 1; + let mut previous_prefix: Option<([u8; 4], Vec<_>)> = None; // compute prefixes and store those in the PrefixPostingsListsCache. let mut stream = words_fst.into_stream(); while let Some(input) = stream.next() { - for i in 1..=MAX_PREFIX_LENGTH { - let prefix = &input[..i]; - if let Some(postings_list) = postings_lists_store.postings_list(writer, prefix)? { - if let (Ok(input), Ok(prefix)) = (std::str::from_utf8(input), std::str::from_utf8(prefix)) { - debug!("{:?} postings list (prefix {:?}) length {}", input, prefix, postings_list.len()); - } + if let Some(postings_list) = postings_lists_store.postings_list(writer, input)?.map(Cow::into_owned) { + let prefix = &input[..1]; - // compute the new prefix postings lists - let mut p = [0; 4]; - let len = std::cmp::min(4, prefix.len()); - p[..len].copy_from_slice(&prefix[..len]); + let mut arr = [0; 4]; + let len = std::cmp::min(4, prefix.len()); + arr[..len].copy_from_slice(prefix); + let arr_prefix = arr; - let previous = match pplc_store.prefix_postings_list(writer, p)? { - Some(previous) => previous, - None => Cow::Owned(SetBuf::default()), - }; + // if let (Ok(input), Ok(prefix)) = (std::str::from_utf8(input), std::str::from_utf8(prefix)) { + // debug!("{:?} postings list (prefix {:?}) length {}", input, prefix, postings_list.len()); + // } - let new_postings_list = Union::new(&postings_list, &previous).into_set_buf(); - pplc_store.put_prefix_postings_list(writer, p, &new_postings_list)?; + match previous_prefix { + Some((ref mut prev_prefix, ref mut prev_postings_list)) if *prev_prefix != arr_prefix => { + prev_postings_list.sort_unstable(); + prev_postings_list.dedup(); - debug!("new length {}", new_postings_list.len()); + if let Ok(prefix) = std::str::from_utf8(&prev_prefix[..1]) { + debug!("writing the prefix of {:?} of length {}", + prefix, prev_postings_list.len()); + } + + let pls = Set::new_unchecked(&prev_postings_list); + pplc_store.put_prefix_postings_list(writer, *prev_prefix, &pls)?; + + *prev_prefix = arr_prefix; + prev_postings_list.clear(); + prev_postings_list.extend_from_slice(&postings_list); + }, + Some((_, ref mut prev_postings_list)) => { + prev_postings_list.extend_from_slice(&postings_list); + }, + None => { + let mut arr = [0; 4]; + let len = std::cmp::min(4, prefix.len()); + arr[..len].copy_from_slice(&prefix[..len]); + + let prev_prefix = arr; + previous_prefix = Some((prev_prefix, postings_list.to_vec())); + }, } + + // debug!("new length {}", new_postings_list.len()); } } + // write the last prefix postings lists + if let Some((prev_prefix, mut prev_postings_list)) = previous_prefix.take() { + prev_postings_list.sort_unstable(); + prev_postings_list.dedup(); + + let pls = Set::new_unchecked(&prev_postings_list); + pplc_store.put_prefix_postings_list(writer, prev_prefix, &pls)?; + } + Ok(()) } diff --git a/meilisearch-core/src/update/mod.rs b/meilisearch-core/src/update/mod.rs index 265a6e193..0f8b68a73 100644 --- a/meilisearch-core/src/update/mod.rs +++ b/meilisearch-core/src/update/mod.rs @@ -23,15 +23,12 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::time::Instant; use chrono::{DateTime, Utc}; -use fst::{IntoStreamer, Streamer}; use heed::Result as ZResult; use log::debug; use serde::{Deserialize, Serialize}; use crate::{store, DocumentId, MResult}; use crate::database::{MainT, UpdateT}; -use crate::bucket_sort::bucket_sort; -use crate::criterion::Criteria; use meilisearch_schema::Schema; #[derive(Debug, Clone, Serialize, Deserialize)]