diff --git a/Cargo.lock b/Cargo.lock index 4bba1bb00..89f3561bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -468,7 +468,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "benchmarks" -version = "1.4.0" +version = "1.4.1" dependencies = [ "anyhow", "bytes", @@ -1206,7 +1206,7 @@ dependencies = [ [[package]] name = "dump" -version = "1.4.0" +version = "1.4.1" dependencies = [ "anyhow", "big_s", @@ -1417,7 +1417,7 @@ dependencies = [ [[package]] name = "file-store" -version = "1.4.0" +version = "1.4.1" dependencies = [ "faux", "tempfile", @@ -1439,7 +1439,7 @@ dependencies = [ [[package]] name = "filter-parser" -version = "1.4.0" +version = "1.4.1" dependencies = [ "insta", "nom", @@ -1459,7 +1459,7 @@ dependencies = [ [[package]] name = "flatten-serde-json" -version = "1.4.0" +version = "1.4.1" dependencies = [ "criterion", "serde_json", @@ -1577,7 +1577,7 @@ dependencies = [ [[package]] name = "fuzzers" -version = "1.4.0" +version = "1.4.1" dependencies = [ "arbitrary", "clap", @@ -1891,7 +1891,7 @@ dependencies = [ [[package]] name = "index-scheduler" -version = "1.4.0" +version = "1.4.1" dependencies = [ "anyhow", "big_s", @@ -2088,7 +2088,7 @@ dependencies = [ [[package]] name = "json-depth-checker" -version = "1.4.0" +version = "1.4.1" dependencies = [ "criterion", "serde_json", @@ -2500,7 +2500,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "meili-snap" -version = "1.4.0" +version = "1.4.1" dependencies = [ "insta", "md5", @@ -2509,7 +2509,7 @@ dependencies = [ [[package]] name = "meilisearch" -version = "1.4.0" +version = "1.4.1" dependencies = [ "actix-cors", "actix-http", @@ -2600,7 +2600,7 @@ dependencies = [ [[package]] name = "meilisearch-auth" -version = "1.4.0" +version = "1.4.1" dependencies = [ "base64 0.21.2", "enum-iterator", @@ -2619,7 +2619,7 @@ dependencies = [ [[package]] name = "meilisearch-types" -version = "1.4.0" +version = "1.4.1" dependencies = [ "actix-web", "anyhow", @@ -2673,7 +2673,7 @@ dependencies = [ [[package]] name = "milli" -version = "1.4.0" +version = "1.4.1" dependencies = [ "big_s", "bimap", @@ -2995,7 +2995,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "permissive-json-pointer" -version = "1.4.0" +version = "1.4.1" dependencies = [ "big_s", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 9c89aadfb..05c7b1012 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ members = [ ] [workspace.package] -version = "1.4.0" +version = "1.4.1" authors = ["Quentin de Quelen ", "Clément Renault "] description = "Meilisearch HTTP server" homepage = "https://meilisearch.com" diff --git a/meilisearch/tests/search/distinct.rs b/meilisearch/tests/search/distinct.rs new file mode 100644 index 000000000..93c5197a6 --- /dev/null +++ b/meilisearch/tests/search/distinct.rs @@ -0,0 +1,63 @@ +use meili_snap::snapshot; +use once_cell::sync::Lazy; + +use crate::common::{Server, Value}; +use crate::json; + +pub(self) static DOCUMENTS: Lazy = Lazy::new(|| { + json!([ + {"productId": 1, "shopId": 1}, + {"productId": 2, "shopId": 1}, + {"productId": 3, "shopId": 2}, + {"productId": 4, "shopId": 2}, + {"productId": 5, "shopId": 3}, + {"productId": 6, "shopId": 3}, + {"productId": 7, "shopId": 4}, + {"productId": 8, "shopId": 4}, + {"productId": 9, "shopId": 5}, + {"productId": 10, "shopId": 5} + ]) +}); + +pub(self) static DOCUMENT_PRIMARY_KEY: &str = "productId"; +pub(self) static DOCUMENT_DISTINCT_KEY: &str = "shopId"; + +/// testing: https://github.com/meilisearch/meilisearch/issues/4078 +#[actix_rt::test] +async fn distinct_search_with_offset_no_ranking() { + let server = Server::new().await; + let index = server.index("test"); + + let documents = DOCUMENTS.clone(); + index.add_documents(documents, Some(DOCUMENT_PRIMARY_KEY)).await; + index.update_distinct_attribute(json!(DOCUMENT_DISTINCT_KEY)).await; + index.wait_task(1).await; + + fn get_hits(Value(response): Value) -> Vec { + let hits_array = response["hits"].as_array().unwrap(); + hits_array.iter().map(|h| h[DOCUMENT_DISTINCT_KEY].as_i64().unwrap()).collect::>() + } + + let (response, code) = index.search_post(json!({"limit": 2, "offset": 0})).await; + let hits = get_hits(response); + snapshot!(code, @"200 OK"); + snapshot!(hits.len(), @"2"); + snapshot!(format!("{:?}", hits), @"[1, 2]"); + + let (response, code) = index.search_post(json!({"limit": 2, "offset": 2})).await; + let hits = get_hits(response); + snapshot!(code, @"200 OK"); + snapshot!(hits.len(), @"2"); + snapshot!(format!("{:?}", hits), @"[3, 4]"); + + let (response, code) = index.search_post(json!({"limit": 10, "offset": 4})).await; + let hits = get_hits(response); + snapshot!(code, @"200 OK"); + snapshot!(hits.len(), @"1"); + snapshot!(format!("{:?}", hits), @"[5]"); + + let (response, code) = index.search_post(json!({"limit": 10, "offset": 5})).await; + let hits = get_hits(response); + snapshot!(code, @"200 OK"); + snapshot!(hits.len(), @"0"); +} diff --git a/meilisearch/tests/search/mod.rs b/meilisearch/tests/search/mod.rs index 44051eb39..3428da467 100644 --- a/meilisearch/tests/search/mod.rs +++ b/meilisearch/tests/search/mod.rs @@ -1,6 +1,7 @@ // This modules contains all the test concerning search. Each particular feature of the search // should be tested in its own module to isolate tests and keep the tests readable. +mod distinct; mod errors; mod facet_search; mod formatted; diff --git a/milli/src/documents/enriched.rs b/milli/src/documents/enriched.rs index fa21c0f87..4e1320c6c 100644 --- a/milli/src/documents/enriched.rs +++ b/milli/src/documents/enriched.rs @@ -1,4 +1,5 @@ use std::fs::File; +use std::io::BufReader; use std::{io, str}; use obkv::KvReader; @@ -19,14 +20,14 @@ use crate::FieldId; pub struct EnrichedDocumentsBatchReader { documents: DocumentsBatchReader, primary_key: String, - external_ids: grenad::ReaderCursor, + external_ids: grenad::ReaderCursor>, } impl EnrichedDocumentsBatchReader { pub fn new( documents: DocumentsBatchReader, primary_key: String, - external_ids: grenad::Reader, + external_ids: grenad::Reader>, ) -> Result { if documents.documents_count() as u64 == external_ids.len() { Ok(EnrichedDocumentsBatchReader { @@ -75,7 +76,7 @@ pub struct EnrichedDocument<'a> { pub struct EnrichedDocumentsBatchCursor { documents: DocumentsBatchCursor, primary_key: String, - external_ids: grenad::ReaderCursor, + external_ids: grenad::ReaderCursor>, } impl EnrichedDocumentsBatchCursor { diff --git a/milli/src/search/new/bucket_sort.rs b/milli/src/search/new/bucket_sort.rs index 03e613b37..cf2f08cce 100644 --- a/milli/src/search/new/bucket_sort.rs +++ b/milli/src/search/new/bucket_sort.rs @@ -46,18 +46,27 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>( if let Some(distinct_fid) = distinct_fid { let mut excluded = RoaringBitmap::new(); let mut results = vec![]; + let mut skip = 0; for docid in universe.iter() { - if results.len() >= from + length { + if results.len() >= length { break; } if excluded.contains(docid) { continue; } + distinct_single_docid(ctx.index, ctx.txn, distinct_fid, docid, &mut excluded)?; + skip += 1; + if skip <= from { + continue; + } + results.push(docid); } + let mut all_candidates = universe - excluded; all_candidates.extend(results.iter().copied()); + return Ok(BucketSortOutput { scores: vec![Default::default(); results.len()], docids: results, diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index 30f15ebab..a3f0c8f71 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::fs::File; +use std::io::BufReader; use grenad::CompressionType; use heed::types::ByteSlice; @@ -30,7 +31,7 @@ pub struct FacetsUpdateBulk<'i> { facet_type: FacetType, field_ids: Vec, // None if level 0 does not need to be updated - new_data: Option>, + new_data: Option>>, } impl<'i> FacetsUpdateBulk<'i> { @@ -38,7 +39,7 @@ impl<'i> FacetsUpdateBulk<'i> { index: &'i Index, field_ids: Vec, facet_type: FacetType, - new_data: grenad::Reader, + new_data: grenad::Reader>, group_size: u8, min_level_size: u8, ) -> FacetsUpdateBulk<'i> { @@ -187,7 +188,7 @@ impl FacetsUpdateBulkInner { &self, field_id: FieldId, txn: &RoTxn, - ) -> Result<(Vec>, RoaringBitmap)> { + ) -> Result<(Vec>>, RoaringBitmap)> { let mut all_docids = RoaringBitmap::new(); let subwriters = self.compute_higher_levels(txn, field_id, 32, &mut |bitmaps, _| { for bitmap in bitmaps { @@ -259,7 +260,7 @@ impl FacetsUpdateBulkInner { field_id: u16, level: u8, handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>, - ) -> Result>> { + ) -> Result>>> { if level == 0 { self.read_level_0(rtxn, field_id, handle_group)?; // Level 0 is already in the database diff --git a/milli/src/update/facet/incremental.rs b/milli/src/update/facet/incremental.rs index a921d4115..743c0b038 100644 --- a/milli/src/update/facet/incremental.rs +++ b/milli/src/update/facet/incremental.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::fs::File; +use std::io::BufReader; use heed::types::{ByteSlice, DecodeIgnore}; use heed::{BytesDecode, Error, RoTxn, RwTxn}; @@ -34,14 +35,14 @@ pub struct FacetsUpdateIncremental<'i> { index: &'i Index, inner: FacetsUpdateIncrementalInner, facet_type: FacetType, - new_data: grenad::Reader, + new_data: grenad::Reader>, } impl<'i> FacetsUpdateIncremental<'i> { pub fn new( index: &'i Index, facet_type: FacetType, - new_data: grenad::Reader, + new_data: grenad::Reader>, group_size: u8, min_level_size: u8, max_group_size: u8, diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index 15776a709..bbd25f91e 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -78,6 +78,7 @@ pub const FACET_MIN_LEVEL_SIZE: u8 = 5; use std::collections::BTreeSet; use std::fs::File; +use std::io::BufReader; use std::iter::FromIterator; use charabia::normalizer::{Normalize, NormalizerOption}; @@ -108,13 +109,17 @@ pub struct FacetsUpdate<'i> { index: &'i Index, database: heed::Database, FacetGroupValueCodec>, facet_type: FacetType, - new_data: grenad::Reader, + new_data: grenad::Reader>, group_size: u8, max_group_size: u8, min_level_size: u8, } impl<'i> FacetsUpdate<'i> { - pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader) -> Self { + pub fn new( + index: &'i Index, + facet_type: FacetType, + new_data: grenad::Reader>, + ) -> Self { let database = match facet_type { FacetType::String => index .facet_id_string_docids diff --git a/milli/src/update/index_documents/enrich.rs b/milli/src/update/index_documents/enrich.rs index 35a7c33f3..22b16f253 100644 --- a/milli/src/update/index_documents/enrich.rs +++ b/milli/src/update/index_documents/enrich.rs @@ -1,4 +1,4 @@ -use std::io::{Read, Seek}; +use std::io::{BufWriter, Read, Seek}; use std::result::Result as StdResult; use std::{fmt, iter}; @@ -35,7 +35,7 @@ pub fn enrich_documents_batch( let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index(); - let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?; + let mut external_ids = tempfile::tempfile().map(BufWriter::new).map(grenad::Writer::new)?; let mut uuid_buffer = [0; uuid::fmt::Hyphenated::LENGTH]; // The primary key *field id* that has already been set for this index or the one diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index 1c24a0fcf..643d16354 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryInto; use std::fs::File; +use std::io::BufReader; use std::{io, mem, str}; use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; @@ -31,7 +32,7 @@ pub fn extract_docid_word_positions( allowed_separators: Option<&[&str]>, dictionary: Option<&[&str]>, max_positions_per_attributes: Option, -) -> Result<(RoaringBitmap, grenad::Reader, ScriptLanguageDocidsMap)> { +) -> Result<(RoaringBitmap, grenad::Reader>, ScriptLanguageDocidsMap)> { puffin::profile_function!(); let max_positions_per_attributes = max_positions_per_attributes diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index dec02b120..d557e0b6c 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use heed::{BytesDecode, BytesEncode}; @@ -19,7 +19,7 @@ use crate::Result; pub fn extract_facet_number_docids( docid_fid_facet_number: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index 0035f54e1..b1b27449e 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use heed::BytesEncode; @@ -17,7 +17,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; pub fn extract_facet_string_docids( docid_fid_facet_string: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 5496a071b..42c355323 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use std::mem::size_of; use heed::zerocopy::AsBytes; @@ -17,11 +17,11 @@ use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET /// The extracted facet values stored in grenad files by type. pub struct ExtractedFacetValues { - pub docid_fid_facet_numbers_chunk: grenad::Reader, - pub docid_fid_facet_strings_chunk: grenad::Reader, - pub fid_facet_is_null_docids_chunk: grenad::Reader, - pub fid_facet_is_empty_docids_chunk: grenad::Reader, - pub fid_facet_exists_docids_chunk: grenad::Reader, + pub docid_fid_facet_numbers_chunk: grenad::Reader>, + pub docid_fid_facet_strings_chunk: grenad::Reader>, + pub fid_facet_is_null_docids_chunk: grenad::Reader>, + pub fid_facet_is_empty_docids_chunk: grenad::Reader>, + pub fid_facet_exists_docids_chunk: grenad::Reader>, } /// Extracts the facet values of each faceted field of each document. diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index 79cf4c7fe..92564b4cd 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use grenad::Sorter; @@ -21,7 +21,7 @@ use crate::{relative_from_absolute_position, DocumentId, FieldId, Result}; pub fn extract_fid_word_count_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_geo_points.rs b/milli/src/update/index_documents/extract/extract_geo_points.rs index 139e8230a..285a4bdba 100644 --- a/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use concat_arrays::concat_arrays; use serde_json::Value; @@ -18,7 +18,7 @@ pub fn extract_geo_points( indexer: GrenadParameters, primary_key_id: FieldId, (lat_fid, lng_fid): (FieldId, FieldId), -) -> Result> { +) -> Result>> { puffin::profile_function!(); let mut writer = create_writer( diff --git a/milli/src/update/index_documents/extract/extract_vector_points.rs b/milli/src/update/index_documents/extract/extract_vector_points.rs index dd5a4d3a7..863bc07c3 100644 --- a/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -1,6 +1,6 @@ use std::convert::TryFrom; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use bytemuck::cast_slice; use serde_json::{from_slice, Value}; @@ -18,7 +18,7 @@ pub fn extract_vector_points( indexer: GrenadParameters, primary_key_id: FieldId, vectors_fid: FieldId, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let mut writer = create_writer( diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index f1656d024..f211f7023 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use std::iter::FromIterator; use roaring::RoaringBitmap; @@ -26,7 +26,7 @@ pub fn extract_word_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, exact_attributes: &HashSet, -) -> Result<(grenad::Reader, grenad::Reader)> { +) -> Result<(grenad::Reader>, grenad::Reader>)> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_word_fid_docids.rs b/milli/src/update/index_documents/extract/extract_word_fid_docids.rs index aaf8fad79..09f571038 100644 --- a/milli/src/update/index_documents/extract/extract_word_fid_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_fid_docids.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use super::helpers::{ create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, @@ -14,7 +14,7 @@ use crate::{relative_from_absolute_position, DocumentId, Result}; pub fn extract_word_fid_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 4c910f32e..9ddd5ff4c 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -1,6 +1,7 @@ use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap}; use std::fs::File; +use std::io::BufReader; use std::{cmp, io, mem, str, vec}; use super::helpers::{ @@ -20,7 +21,7 @@ use crate::{DocumentId, Result}; pub fn extract_word_pair_proximity_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index e945833e6..94139ddf8 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use super::helpers::{ create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, @@ -17,7 +17,7 @@ use crate::{bucketed_position, relative_from_absolute_position, DocumentId, Resu pub fn extract_word_position_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index c3a023e71..f44eac8f5 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -12,6 +12,7 @@ mod extract_word_position_docids; use std::collections::HashSet; use std::fs::File; +use std::io::BufReader; use crossbeam_channel::Sender; use log::debug; @@ -39,8 +40,8 @@ use crate::{FieldId, Result}; /// Send data in grenad file over provided Sender. #[allow(clippy::too_many_arguments)] pub(crate) fn data_from_obkv_documents( - original_obkv_chunks: impl Iterator>> + Send, - flattened_obkv_chunks: impl Iterator>> + Send, + original_obkv_chunks: impl Iterator>>> + Send, + flattened_obkv_chunks: impl Iterator>>> + Send, indexer: GrenadParameters, lmdb_writer_sx: Sender>, searchable_fields: Option>, @@ -152,7 +153,7 @@ pub(crate) fn data_from_obkv_documents( }); } - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_word_positions_chunks.clone(), indexer, lmdb_writer_sx.clone(), @@ -162,7 +163,7 @@ pub(crate) fn data_from_obkv_documents( "word-pair-proximity-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_word_positions_chunks.clone(), indexer, lmdb_writer_sx.clone(), @@ -172,7 +173,11 @@ pub(crate) fn data_from_obkv_documents( "field-id-wordcount-docids", ); - spawn_extraction_task::<_, _, Vec<(grenad::Reader, grenad::Reader)>>( + spawn_extraction_task::< + _, + _, + Vec<(grenad::Reader>, grenad::Reader>)>, + >( docid_word_positions_chunks.clone(), indexer, lmdb_writer_sx.clone(), @@ -185,7 +190,7 @@ pub(crate) fn data_from_obkv_documents( "word-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_word_positions_chunks.clone(), indexer, lmdb_writer_sx.clone(), @@ -194,7 +199,7 @@ pub(crate) fn data_from_obkv_documents( TypedChunk::WordPositionDocids, "word-position-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_word_positions_chunks, indexer, lmdb_writer_sx.clone(), @@ -204,7 +209,7 @@ pub(crate) fn data_from_obkv_documents( "word-fid-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_fid_facet_strings_chunks, indexer, lmdb_writer_sx.clone(), @@ -214,7 +219,7 @@ pub(crate) fn data_from_obkv_documents( "field-id-facet-string-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_fid_facet_numbers_chunks, indexer, lmdb_writer_sx, @@ -269,7 +274,7 @@ fn spawn_extraction_task( /// Extract chunked data and send it into lmdb_writer_sx sender: /// - documents fn send_original_documents_data( - original_documents_chunk: Result>, + original_documents_chunk: Result>>, indexer: GrenadParameters, lmdb_writer_sx: Sender>, vectors_field_id: Option, @@ -311,7 +316,7 @@ fn send_original_documents_data( #[allow(clippy::too_many_arguments)] #[allow(clippy::type_complexity)] fn send_and_extract_flattened_documents_data( - flattened_documents_chunk: Result>, + flattened_documents_chunk: Result>>, indexer: GrenadParameters, lmdb_writer_sx: Sender>, searchable_fields: &Option>, @@ -328,7 +333,10 @@ fn send_and_extract_flattened_documents_data( grenad::Reader, ( grenad::Reader, - (grenad::Reader, (grenad::Reader, grenad::Reader)), + ( + grenad::Reader>, + (grenad::Reader>, grenad::Reader>), + ), ), ), )> { diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index d5f5ac0bd..582bf2a5b 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; use std::fs::File; -use std::io::{self, Seek}; +use std::io::{self, BufReader, BufWriter, Seek}; use std::time::Instant; use grenad::{CompressionType, Sorter}; @@ -17,13 +17,13 @@ pub fn create_writer( typ: grenad::CompressionType, level: Option, file: R, -) -> grenad::Writer { +) -> grenad::Writer> { let mut builder = grenad::Writer::builder(); builder.compression_type(typ); if let Some(level) = level { builder.compression_level(level); } - builder.build(file) + builder.build(BufWriter::new(file)) } pub fn create_sorter( @@ -53,7 +53,7 @@ pub fn create_sorter( pub fn sorter_into_reader( sorter: grenad::Sorter, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { let mut writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, @@ -64,16 +64,18 @@ pub fn sorter_into_reader( writer_into_reader(writer) } -pub fn writer_into_reader(writer: grenad::Writer) -> Result> { - let mut file = writer.into_inner()?; +pub fn writer_into_reader( + writer: grenad::Writer>, +) -> Result>> { + let mut file = writer.into_inner()?.into_inner().map_err(|err| err.into_error())?; file.rewind()?; - grenad::Reader::new(file).map_err(Into::into) + grenad::Reader::new(BufReader::new(file)).map_err(Into::into) } pub unsafe fn as_cloneable_grenad( - reader: &grenad::Reader, + reader: &grenad::Reader>, ) -> Result> { - let file = reader.get_ref(); + let file = reader.get_ref().get_ref(); let mmap = memmap2::Mmap::map(file)?; let cursor = io::Cursor::new(ClonableMmap::from(mmap)); let reader = grenad::Reader::new(cursor)?; @@ -89,8 +91,8 @@ where fn merge(self, merge_fn: MergeFn, indexer: &GrenadParameters) -> Result; } -impl MergeableReader for Vec> { - type Output = grenad::Reader; +impl MergeableReader for Vec>> { + type Output = grenad::Reader>; fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { let mut merger = MergerBuilder::new(merge_fn); @@ -99,8 +101,8 @@ impl MergeableReader for Vec> { } } -impl MergeableReader for Vec<(grenad::Reader, grenad::Reader)> { - type Output = (grenad::Reader, grenad::Reader); +impl MergeableReader for Vec<(grenad::Reader>, grenad::Reader>)> { + type Output = (grenad::Reader>, grenad::Reader>); fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { let mut m1 = MergerBuilder::new(merge_fn); @@ -125,7 +127,7 @@ impl MergerBuilder { Ok(()) } - fn finish(self, params: &GrenadParameters) -> Result> { + fn finish(self, params: &GrenadParameters) -> Result>> { let merger = self.0.build(); let mut writer = create_writer( params.chunk_compression_type, @@ -176,7 +178,7 @@ pub fn grenad_obkv_into_chunks( reader: grenad::Reader, indexer: GrenadParameters, documents_chunk_size: usize, -) -> Result>>> { +) -> Result>>>> { let mut continue_reading = true; let mut cursor = reader.into_cursor()?; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 7a0c811a8..f0e3bbbf0 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -659,8 +659,10 @@ impl<'a, 'i> Transform<'a, 'i> { new_documents_ids: self.new_documents_ids, replaced_documents_ids: self.replaced_documents_ids, documents_count: self.documents_count, - original_documents, - flattened_documents, + original_documents: original_documents.into_inner().map_err(|err| err.into_error())?, + flattened_documents: flattened_documents + .into_inner() + .map_err(|err| err.into_error())?, }) } @@ -779,8 +781,10 @@ impl<'a, 'i> Transform<'a, 'i> { new_documents_ids: documents_ids, replaced_documents_ids: RoaringBitmap::default(), documents_count, - original_documents, - flattened_documents, + original_documents: original_documents.into_inner().map_err(|err| err.into_error())?, + flattened_documents: flattened_documents + .into_inner() + .map_err(|err| err.into_error())?, }; let new_facets = output.compute_real_facets(wtxn, self.index)?; diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 788aaf93d..5895a69c5 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::convert::TryInto; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use bytemuck::allocation::pod_collect_to_vec; use charabia::{Language, Script}; @@ -27,22 +27,22 @@ pub(crate) enum TypedChunk { FieldIdDocidFacetStrings(grenad::Reader), FieldIdDocidFacetNumbers(grenad::Reader), Documents(grenad::Reader), - FieldIdWordcountDocids(grenad::Reader), + FieldIdWordcountDocids(grenad::Reader>), NewDocumentsIds(RoaringBitmap), WordDocids { - word_docids_reader: grenad::Reader, - exact_word_docids_reader: grenad::Reader, + word_docids_reader: grenad::Reader>, + exact_word_docids_reader: grenad::Reader>, }, - WordPositionDocids(grenad::Reader), - WordFidDocids(grenad::Reader), - WordPairProximityDocids(grenad::Reader), - FieldIdFacetStringDocids(grenad::Reader), - FieldIdFacetNumberDocids(grenad::Reader), - FieldIdFacetExistsDocids(grenad::Reader), - FieldIdFacetIsNullDocids(grenad::Reader), - FieldIdFacetIsEmptyDocids(grenad::Reader), - GeoPoints(grenad::Reader), - VectorPoints(grenad::Reader), + WordPositionDocids(grenad::Reader>), + WordFidDocids(grenad::Reader>), + WordPairProximityDocids(grenad::Reader>), + FieldIdFacetStringDocids(grenad::Reader>), + FieldIdFacetNumberDocids(grenad::Reader>), + FieldIdFacetExistsDocids(grenad::Reader>), + FieldIdFacetIsNullDocids(grenad::Reader>), + FieldIdFacetIsEmptyDocids(grenad::Reader>), + GeoPoints(grenad::Reader>), + VectorPoints(grenad::Reader>), ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>), } diff --git a/milli/src/update/prefix_word_pairs/mod.rs b/milli/src/update/prefix_word_pairs/mod.rs index 3105b16e4..e3135d546 100644 --- a/milli/src/update/prefix_word_pairs/mod.rs +++ b/milli/src/update/prefix_word_pairs/mod.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; use std::collections::HashSet; -use std::io::BufReader; +use std::io::{BufReader, BufWriter}; use grenad::CompressionType; use heed::types::ByteSlice; @@ -119,9 +119,9 @@ pub fn insert_into_database( pub fn write_into_lmdb_database_without_merging( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, - writer: grenad::Writer, + writer: grenad::Writer>, ) -> Result<()> { - let file = writer.into_inner()?; + let file = writer.into_inner()?.into_inner().map_err(|err| err.into_error())?; let reader = grenad::Reader::new(BufReader::new(file))?; if database.is_empty(wtxn)? { let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; diff --git a/milli/tests/search/distinct.rs b/milli/tests/search/distinct.rs index d8291ee30..e1876286c 100644 --- a/milli/tests/search/distinct.rs +++ b/milli/tests/search/distinct.rs @@ -8,7 +8,7 @@ use Criterion::*; use crate::search::{self, EXTERNAL_DOCUMENTS_IDS}; macro_rules! test_distinct { - ($func:ident, $distinct:ident, $exhaustive:ident, $limit:expr, $criteria:expr, $n_res:expr) => { + ($func:ident, $distinct:ident, $exhaustive:ident, $limit:expr, $offset:expr, $criteria:expr, $n_res:expr) => { #[test] fn $func() { let criteria = $criteria; @@ -27,6 +27,7 @@ macro_rules! test_distinct { let mut search = Search::new(&rtxn, &index); search.query(search::TEST_QUERY); search.limit($limit); + search.offset($offset); search.exhaustive_number_hits($exhaustive); search.terms_matching_strategy(TermsMatchingStrategy::default()); @@ -47,6 +48,7 @@ macro_rules! test_distinct { Some(d.id) } }) + .skip($offset) .take($limit) .collect(); @@ -61,6 +63,7 @@ test_distinct!( tag, true, 1, + 0, vec![Words, Typo, Proximity, Attribute, Exactness], 3 ); @@ -69,6 +72,7 @@ test_distinct!( asc_desc_rank, true, 1, + 0, vec![Words, Typo, Proximity, Attribute, Exactness], 7 ); @@ -77,6 +81,7 @@ test_distinct!( asc_desc_rank, true, 0, + 0, vec![Desc(S("attribute_rank")), Desc(S("exactness_rank")), Exactness, Typo], 7 ); @@ -86,6 +91,7 @@ test_distinct!( tag, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Typo, Proximity, Attribute, Exactness], 3 ); @@ -94,6 +100,7 @@ test_distinct!( asc_desc_rank, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Typo, Proximity, Attribute, Exactness], 7 ); @@ -102,6 +109,7 @@ test_distinct!( tag, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words], 3 ); @@ -110,6 +118,7 @@ test_distinct!( asc_desc_rank, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words], 7 ); @@ -118,6 +127,7 @@ test_distinct!( tag, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Typo], 3 ); @@ -126,6 +136,7 @@ test_distinct!( asc_desc_rank, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Typo], 7 ); @@ -134,6 +145,7 @@ test_distinct!( tag, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Proximity], 3 ); @@ -142,6 +154,7 @@ test_distinct!( asc_desc_rank, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Proximity], 7 ); @@ -150,6 +163,7 @@ test_distinct!( tag, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Attribute], 3 ); @@ -158,6 +172,7 @@ test_distinct!( asc_desc_rank, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Attribute], 7 ); @@ -166,6 +181,7 @@ test_distinct!( tag, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Exactness], 3 ); @@ -174,6 +190,47 @@ test_distinct!( asc_desc_rank, false, EXTERNAL_DOCUMENTS_IDS.len(), + 0, vec![Words, Exactness], 7 ); +test_distinct!( + // testing: https://github.com/meilisearch/meilisearch/issues/4078 + distinct_string_limit_and_offset, + tag, + false, + EXTERNAL_DOCUMENTS_IDS.len(), + 1, + vec![], + 2 +); +test_distinct!( + // testing: https://github.com/meilisearch/meilisearch/issues/4078 + exhaustive_distinct_string_limit_and_offset, + tag, + true, + 1, + 2, + vec![], + 1 +); +test_distinct!( + // testing: https://github.com/meilisearch/meilisearch/issues/4078 + distinct_number_limit_and_offset, + asc_desc_rank, + false, + EXTERNAL_DOCUMENTS_IDS.len(), + 2, + vec![], + 5 +); +test_distinct!( + // testing: https://github.com/meilisearch/meilisearch/issues/4078 + exhaustive_distinct_number_limit_and_offset, + asc_desc_rank, + true, + 2, + 4, + vec![], + 3 +);