From c50d3edc4a0cb0cc55b6b59c906342aef2dbbcf5 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Tue, 3 Sep 2024 11:02:39 +0200 Subject: [PATCH] Integrate first searchable exctrator --- milli/src/update/new/channel.rs | 17 +- milli/src/update/new/document_change.rs | 26 +- milli/src/update/new/extract/cache.rs | 4 +- .../update/new/extract/extract_word_docids.rs | 250 +++++++++++----- milli/src/update/new/extract/mod.rs | 4 + .../update/new/extract/tokenize_document.rs | 268 +++++++++++++----- milli/src/update/new/indexer/mod.rs | 18 +- milli/src/update/new/merger.rs | 6 +- milli/src/update/new/mod.rs | 7 +- 9 files changed, 419 insertions(+), 181 deletions(-) diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 4041fcc6a..d94b2cc00 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -1,10 +1,12 @@ use std::fs::File; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; +use grenad::Merger; use heed::types::Bytes; use super::StdResult; use crate::update::new::KvReaderFieldId; +use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{DocumentId, Index}; /// The capacity of the channel is currently in number of messages. @@ -159,7 +161,7 @@ impl DocumentSender { } pub enum MergerOperation { - WordDocidsCursors(Vec>), + WordDocidsMerger(Merger), } pub struct MergerReceiver(Receiver); @@ -175,3 +177,16 @@ impl IntoIterator for MergerReceiver { #[derive(Clone)] pub struct DeladdCboRoaringBitmapSender(Sender); + +impl DeladdCboRoaringBitmapSender { + pub fn word_docids( + &self, + merger: Merger, + ) -> StdResult<(), SendError<()>> { + let operation = MergerOperation::WordDocidsMerger(merger); + match self.0.send(operation) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } +} diff --git a/milli/src/update/new/document_change.rs b/milli/src/update/new/document_change.rs index 6f9d767cb..9076f32db 100644 --- a/milli/src/update/new/document_change.rs +++ b/milli/src/update/new/document_change.rs @@ -2,7 +2,7 @@ use heed::RoTxn; use obkv::KvReader; use crate::update::new::KvReaderFieldId; -use crate::{DocumentId, FieldId}; +use crate::{DocumentId, FieldId, Index}; pub enum DocumentChange { Deletion(Deletion), @@ -12,14 +12,14 @@ pub enum DocumentChange { pub struct Deletion { docid: DocumentId, - external_docid: String, // ? - current: Box, + external_docid: String, // ? + current: Box, // ? } pub struct Update { docid: DocumentId, - external_docid: String, // ? - current: Box, + external_docid: String, // ? + current: Box, // ? new: Box, } @@ -30,7 +30,7 @@ pub struct Insertion { } impl DocumentChange { - fn docid(&self) -> DocumentId { + pub fn docid(&self) -> DocumentId { match &self { Self::Deletion(inner) => inner.docid(), Self::Update(inner) => inner.docid(), @@ -48,11 +48,11 @@ impl Deletion { Self { docid, external_docid, current } } - fn docid(&self) -> DocumentId { + pub fn docid(&self) -> DocumentId { self.docid } - fn current(&self, rtxn: &RoTxn) -> &KvReader { + pub fn current(&self, rtxn: &RoTxn, index: &Index) -> &KvReader { unimplemented!() } } @@ -62,11 +62,11 @@ impl Insertion { Insertion { docid, external_docid, new } } - fn docid(&self) -> DocumentId { + pub fn docid(&self) -> DocumentId { self.docid } - fn new(&self) -> &KvReader { + pub fn new(&self) -> &KvReader { unimplemented!() } } @@ -81,15 +81,15 @@ impl Update { Update { docid, external_docid, current, new } } - fn docid(&self) -> DocumentId { + pub fn docid(&self) -> DocumentId { self.docid } - fn current(&self, rtxn: &RoTxn) -> &KvReader { + pub fn current(&self, rtxn: &RoTxn, index: &Index) -> &KvReader { unimplemented!() } - fn new(&self) -> &KvReader { + pub fn new(&self) -> &KvReader { unimplemented!() } } diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 0d72a5a8d..878150eb3 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -2,12 +2,12 @@ use std::borrow::Cow; use std::num::NonZeroUsize; use std::{io, mem}; -use grenad2::{MergeFunction, Sorter}; +use grenad::{MergeFunction, Sorter}; use lru::LruCache; use roaring::RoaringBitmap; use smallvec::SmallVec; -use crate::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; #[derive(Debug)] pub struct CachedSorter { diff --git a/milli/src/update/new/extract/extract_word_docids.rs b/milli/src/update/new/extract/extract_word_docids.rs index e2e1520bc..e2261748a 100644 --- a/milli/src/update/new/extract/extract_word_docids.rs +++ b/milli/src/update/new/extract/extract_word_docids.rs @@ -1,84 +1,180 @@ -pub fn extract_word_docids( - document_change: DocumentChange, - _tokenizer: &Tokenizer, - output: &mut CachedSorter, -) -> grenad::Result<(), io::Error> { - match document_change { - DocumentChange::Deletion(inner) => { - unimplemented!() - } - DocumentChange::Update(inner) => { - unimplemented!() - } - DocumentChange::Insertion(inner) => { - unimplemented!() +use std::fs::File; + +use charabia::TokenizerBuilder; +use grenad::Merger; +use grenad::ReaderCursor; +use heed::RoTxn; +use rayon::iter::IntoParallelIterator; +use rayon::iter::ParallelBridge; +use rayon::iter::ParallelIterator; + +use crate::update::MergeDeladdCboRoaringBitmaps; +use crate::{ + update::{ + create_sorter, + new::{DocumentChange, ItemsPool}, + GrenadParameters, + }, + FieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE, +}; + +use super::{ + cache::{CachedSorter, DelAddRoaringBitmapMerger}, + tokenize_document::DocumentTokenizer, +}; + +pub trait SearchableExtractor { + fn run_extraction( + index: &Index, + fields_ids_map: &FieldsIdsMap, + indexer: GrenadParameters, + document_changes: impl IntoParallelIterator>, + ) -> Result> { + let max_memory = indexer.max_memory_by_thread(); + + let rtxn = index.read_txn()?; + let stop_words = index.stop_words(&rtxn)?; + let allowed_separators = index.allowed_separators(&rtxn)?; + let allowed_separators: Option> = + allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect()); + let dictionary = index.dictionary(&rtxn)?; + let dictionary: Option> = + dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); + let builder = tokenizer_builder( + stop_words.as_ref(), + allowed_separators.as_deref(), + dictionary.as_deref(), + ); + let tokenizer = builder.into_tokenizer(); + + let user_defined_searchable_fields = index.user_defined_searchable_fields(&rtxn)?; + let localized_attributes_rules = + index.localized_attributes_rules(&rtxn)?.unwrap_or_default(); + + let document_tokenizer = DocumentTokenizer { + tokenizer: &tokenizer, + searchable_attributes: user_defined_searchable_fields.as_deref(), + localized_attributes_rules: &localized_attributes_rules, + max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, + }; + + let context_pool = ItemsPool::new(|| { + Ok(( + index.read_txn()?, + &document_tokenizer, + CachedSorter::new( + // TODO use a better value + 100.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + DelAddRoaringBitmapMerger, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + ), + )) + }); + + document_changes.into_par_iter().try_for_each(|document_change| { + context_pool.with(|(rtxn, document_tokenizer, cached_sorter)| { + Self::extract_document_change( + &*rtxn, + index, + document_tokenizer, + &fields_ids_map, + cached_sorter, + document_change?, + ) + }) + })?; + + let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); + for (_rtxn, _tokenizer, cache) in context_pool.into_items() { + let sorter = cache.into_sorter()?; + let readers = sorter.into_reader_cursors()?; + builder.extend(readers); } + + Ok(builder.build()) } - let normalizer_options = NormalizerOption::default(); - - if let Some(previous_doc) = previous_doc { - for (_, v) in previous_doc.iter() { - // Only manage the direct JSON strings - // TODO manage the JSON strings correctly (escaped chars) - if v.first().zip(v.last()) == Some((&b'"', &b'"')) { - let s = std::str::from_utf8(&v[1..v.len() - 1]).unwrap(); - // for token in tokenizer.tokenize(s).filter(|t| t.is_word()) { - // let key = token.lemma().normalize(&normalizer_options); - for token in s.split_whitespace() { - let key = token.normalize(&normalizer_options); - output.insert_del_u32(key.as_bytes(), docid)?; - } - } - } - } - - for (_, v) in new_doc.iter() { - // Only manage the direct JSON strings - // TODO manage the JSON strings correctly (escaped chars) - if v.first().zip(v.last()) == Some((&b'"', &b'"')) { - let s = std::str::from_utf8(&v[1..v.len() - 1]).unwrap(); - // for token in tokenizer.tokenize(s).filter(|t| t.is_word()) { - // let key = token.lemma().normalize(&normalizer_options); - for token in s.split_whitespace() { - let key = token.normalize(&normalizer_options); - output.insert_add_u32(key.as_bytes(), docid)?; - } - } - } - - Ok(()) + fn extract_document_change( + rtxn: &RoTxn, + index: &Index, + document_tokenizer: &DocumentTokenizer, + fields_ids_map: &FieldsIdsMap, + cached_sorter: &mut CachedSorter, + document_change: DocumentChange, + ) -> Result<()>; } -/// take an iterator on tokens and compute their relative position depending on separator kinds -/// if it's an `Hard` separator we add an additional relative proximity of 8 between words, -/// else we keep the standard proximity of 1 between words. -fn process_tokens<'a>( - tokens: impl Iterator>, -) -> impl Iterator)> { - tokens - .skip_while(|token| token.is_separator()) - .scan((0, None), |(offset, prev_kind), mut token| { - match token.kind { - TokenKind::Word | TokenKind::StopWord if !token.lemma().is_empty() => { - *offset += match *prev_kind { - Some(TokenKind::Separator(SeparatorKind::Hard)) => 8, - Some(_) => 1, - None => 0, - }; - *prev_kind = Some(token.kind) - } - TokenKind::Separator(SeparatorKind::Hard) => { - *prev_kind = Some(token.kind); - } - TokenKind::Separator(SeparatorKind::Soft) - if *prev_kind != Some(TokenKind::Separator(SeparatorKind::Hard)) => - { - *prev_kind = Some(token.kind); - } - _ => token.kind = TokenKind::Unknown, +pub struct WordDocidsExtractor; +impl SearchableExtractor for WordDocidsExtractor { + fn extract_document_change( + rtxn: &RoTxn, + index: &Index, + document_tokenizer: &DocumentTokenizer, + fields_ids_map: &FieldsIdsMap, + // TODO: DelAddRoaringBitmapMerger should be CBO + cached_sorter: &mut CachedSorter, + document_change: DocumentChange, + ) -> crate::Result<()> { + match document_change { + DocumentChange::Deletion(inner) => { + let mut token_fn = |_fid, _pos: u16, word: &str| { + cached_sorter.insert_del_u32(word.as_bytes(), inner.docid()).unwrap(); + }; + document_tokenizer.tokenize_document( + inner.current(rtxn, index), + fields_ids_map, + &mut token_fn, + )?; } - Some((*offset, token)) - }) - .filter(|(_, t)| t.is_word()) + DocumentChange::Update(inner) => { + let mut token_fn = |_fid, _pos, word: &str| { + cached_sorter.insert_del_u32(word.as_bytes(), inner.docid()).unwrap(); + }; + document_tokenizer.tokenize_document( + inner.current(rtxn, index), + fields_ids_map, + &mut token_fn, + )?; + + let mut token_fn = |_fid, _pos, word: &str| { + cached_sorter.insert_add_u32(word.as_bytes(), inner.docid()).unwrap(); + }; + document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?; + } + DocumentChange::Insertion(inner) => { + let mut token_fn = |_fid, _pos, word: &str| { + cached_sorter.insert_add_u32(word.as_bytes(), inner.docid()).unwrap(); + }; + document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?; + } + } + + Ok(()) + } +} + +/// Factorize tokenizer building. +fn tokenizer_builder<'a>( + stop_words: Option<&'a fst::Set<&'a [u8]>>, + allowed_separators: Option<&'a [&str]>, + dictionary: Option<&'a [&str]>, +) -> TokenizerBuilder<'a, &'a [u8]> { + let mut tokenizer_builder = TokenizerBuilder::new(); + if let Some(stop_words) = stop_words { + tokenizer_builder.stop_words(stop_words); + } + if let Some(dictionary) = dictionary { + tokenizer_builder.words_dict(dictionary); + } + if let Some(separators) = allowed_separators { + tokenizer_builder.separators(separators); + } + + tokenizer_builder } diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index 26732d4c8..3124068d9 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -1,2 +1,6 @@ mod cache; mod extract_word_docids; +mod tokenize_document; + +pub use extract_word_docids::SearchableExtractor; +pub use extract_word_docids::WordDocidsExtractor; diff --git a/milli/src/update/new/extract/tokenize_document.rs b/milli/src/update/new/extract/tokenize_document.rs index 8793063b0..40f0b4374 100644 --- a/milli/src/update/new/extract/tokenize_document.rs +++ b/milli/src/update/new/extract/tokenize_document.rs @@ -1,56 +1,71 @@ -pub struct DocumentTokenizer { - tokenizer: &Tokenizer, - searchable_attributes: Option<&[String]>, - localized_attributes_rules: &[LocalizedAttributesRule], - max_positions_per_attributes: u32, +use crate::{ + update::new::KvReaderFieldId, FieldId, FieldsIdsMap, Index, InternalError, + LocalizedAttributesRule, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH, +}; +use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; +use heed::RoTxn; +use serde_json::Value; +use std::collections::HashMap; + +pub struct DocumentTokenizer<'a> { + pub tokenizer: &'a Tokenizer<'a>, + pub searchable_attributes: Option<&'a [&'a str]>, + pub localized_attributes_rules: &'a [LocalizedAttributesRule], + pub max_positions_per_attributes: u32, } -impl DocumentTokenizer { - // pub fn new(tokenizer: &Tokenizer, settings: &InnerIndexSettings) -> Self { - // Self { tokenizer, settings } - // } - - pub fn tokenize_document<'a>( - obkv: &KvReader<'a, FieldId>, +impl<'a> DocumentTokenizer<'a> { + pub fn tokenize_document( + &self, + obkv: &KvReaderFieldId, field_id_map: &FieldsIdsMap, - token_fn: impl Fn(FieldId, u16, &str), - ) { - let mut field_position = Hashmap::new(); + token_fn: &mut impl FnMut(FieldId, u16, &str), + ) -> Result<()> { + let mut field_position = HashMap::new(); for (field_id, field_bytes) in obkv { - let field_name = field_id_map.name(field_id); + let Some(field_name) = field_id_map.name(field_id) else { + unreachable!("field id not found in field id map"); + }; + + let mut tokenize_field = |name: &str, value: &Value| { + let Some(field_id) = field_id_map.id(name) else { + unreachable!("field name not found in field id map"); + }; + + let position = + field_position.entry(field_id).and_modify(|counter| *counter += 8).or_insert(0); + if *position as u32 >= self.max_positions_per_attributes { + return; + } - let tokenize_field = |name, value| { - let field_id = field_id_map.id(name); match value { - Number(n) => { + Value::Number(n) => { let token = n.to_string(); - let position = field_position - .entry(field_id) - .and_modify(|counter| *counter += 8) - .or_insert(0); - token_fn(field_id, position, token.as_str()); + if let Ok(position) = (*position).try_into() { + token_fn(field_id, position, token.as_str()); + } } - String(text) => { + Value::String(text) => { // create an iterator of token with their positions. let locales = self .localized_attributes_rules .iter() - .first(|rule| rule.match_str(field_name)) - .map(|rule| rule.locales(field_id)); - let tokens = - process_tokens(tokenizer.tokenize_with_allow_list(field, locales)) - .take_while(|(p, _)| { - (*p as u32) < self.max_positions_per_attributes - }); + .find(|rule| rule.match_str(field_name)) + .map(|rule| rule.locales()); + let tokens = process_tokens( + *position, + self.tokenizer.tokenize_with_allow_list(text.as_str(), locales), + ) + .take_while(|(p, _)| (*p as u32) < self.max_positions_per_attributes); for (index, token) in tokens { // keep a word only if it is not empty and fit in a LMDB key. let token = token.lemma().trim(); if !token.is_empty() && token.len() <= MAX_WORD_LENGTH { - let position: u16 = index - .try_into() - .map_err(|_| SerializationError::InvalidNumberSerialization)?; - writer.insert(position, token.as_bytes())?; + *position = index; + if let Ok(position) = (*position).try_into() { + token_fn(field_id, position, token); + } } } } @@ -59,21 +74,28 @@ impl DocumentTokenizer { }; // if the current field is searchable or contains a searchable attribute - if searchable_attributes.map_or(true, |attributes| { - attributes.iter().any(|name| contained_in(name, field_name)) + if self.searchable_attributes.map_or(true, |attributes| { + attributes.iter().any(|name| perm_json_p::contained_in(name, field_name)) }) { // parse json. match serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)? { - Value::Object(object) => { - seek_leaf_values_in_object(object, selectors, &field_name, tokenize_field) - } - Value::Array(array) => { - seek_leaf_values_in_array(array, selectors, &field_name, tokenize_field) - } - value => tokenize_field(&base_key, value), + Value::Object(object) => perm_json_p::seek_leaf_values_in_object( + &object, + self.searchable_attributes.as_deref(), + &field_name, + &mut tokenize_field, + ), + Value::Array(array) => perm_json_p::seek_leaf_values_in_array( + &array, + self.searchable_attributes.as_deref(), + &field_name, + &mut tokenize_field, + ), + value => tokenize_field(&field_name, &value), } } } + Ok(()) } } @@ -81,11 +103,12 @@ impl DocumentTokenizer { /// if it's an `Hard` separator we add an additional relative proximity of 8 between words, /// else we keep the standard proximity of 1 between words. fn process_tokens<'a>( + start_offset: usize, tokens: impl Iterator>, ) -> impl Iterator)> { tokens .skip_while(|token| token.is_separator()) - .scan((0, None), |(offset, prev_kind), mut token| { + .scan((start_offset, None), |(offset, prev_kind), mut token| { match token.kind { TokenKind::Word | TokenKind::StopWord if !token.lemma().is_empty() => { *offset += match *prev_kind { @@ -110,42 +133,45 @@ fn process_tokens<'a>( .filter(|(_, t)| t.is_word()) } -/// Returns `true` if the `selector` match the `key`. -/// -/// ```text -/// Example: -/// `animaux` match `animaux` -/// `animaux.chien` match `animaux` -/// `animaux.chien` match `animaux` -/// `animaux.chien.nom` match `animaux` -/// `animaux.chien.nom` match `animaux.chien` -/// ----------------------------------------- -/// `animaux` doesn't match `animaux.chien` -/// `animaux.` doesn't match `animaux` -/// `animaux.ch` doesn't match `animaux.chien` -/// `animau` doesn't match `animaux` -/// ``` -fn contained_in(selector: &str, key: &str) -> bool { - selector.starts_with(key) - && selector[key.len()..].chars().next().map(|c| c == SPLIT_SYMBOL).unwrap_or(true) -} - /// TODO move in permissive json pointer mod perm_json_p { + use serde_json::{Map, Value}; + const SPLIT_SYMBOL: char = '.'; + + /// Returns `true` if the `selector` match the `key`. + /// + /// ```text + /// Example: + /// `animaux` match `animaux` + /// `animaux.chien` match `animaux` + /// `animaux.chien` match `animaux` + /// `animaux.chien.nom` match `animaux` + /// `animaux.chien.nom` match `animaux.chien` + /// ----------------------------------------- + /// `animaux` doesn't match `animaux.chien` + /// `animaux.` doesn't match `animaux` + /// `animaux.ch` doesn't match `animaux.chien` + /// `animau` doesn't match `animaux` + /// ``` + pub fn contained_in(selector: &str, key: &str) -> bool { + selector.starts_with(key) + && selector[key.len()..].chars().next().map(|c| c == SPLIT_SYMBOL).unwrap_or(true) + } + pub fn seek_leaf_values<'a>( value: &Map, selectors: impl IntoIterator, - seeker: impl Fn(&str, &Value), + seeker: &mut impl FnMut(&str, &Value), ) { let selectors: Vec<_> = selectors.into_iter().collect(); - seek_leaf_values_in_object(value, &selectors, "", &seeker); + seek_leaf_values_in_object(value, Some(&selectors), "", seeker); } pub fn seek_leaf_values_in_object( value: &Map, - selectors: &[&str], + selectors: Option<&[&str]>, base_key: &str, - seeker: &impl Fn(&str, &Value), + seeker: &mut impl FnMut(&str, &Value), ) { for (key, value) in value.iter() { let base_key = if base_key.is_empty() { @@ -156,8 +182,10 @@ mod perm_json_p { // here if the user only specified `doggo` we need to iterate in all the fields of `doggo` // so we check the contained_in on both side - let should_continue = selectors.iter().any(|selector| { - contained_in(selector, &base_key) || contained_in(&base_key, selector) + let should_continue = selectors.map_or(true, |selectors| { + selectors.iter().any(|selector| { + contained_in(selector, &base_key) || contained_in(&base_key, selector) + }) }); if should_continue { @@ -175,12 +203,12 @@ mod perm_json_p { } pub fn seek_leaf_values_in_array( - values: &mut [Value], - selectors: &[&str], + values: &[Value], + selectors: Option<&[&str]>, base_key: &str, - seeker: &impl Fn(&str, &Value), + seeker: &mut impl FnMut(&str, &Value), ) { - for value in values.iter_mut() { + for value in values { match value { Value::Object(object) => { seek_leaf_values_in_object(object, selectors, base_key, seeker) @@ -193,3 +221,91 @@ mod perm_json_p { } } } + +#[cfg(test)] +mod test { + use super::*; + use charabia::TokenizerBuilder; + use meili_snap::snapshot; + use obkv::KvReader; + use serde_json::json; + #[test] + fn test_tokenize_document() { + let mut fields_ids_map = FieldsIdsMap::new(); + + let field_1 = json!({ + "name": "doggo", + "age": 10, + }); + + let field_2 = json!({ + "catto": { + "name": "pesti", + "age": 23, + } + }); + + let field_3 = json!(["doggo", "catto"]); + + let mut obkv = obkv::KvWriter::memory(); + let field_1_id = fields_ids_map.insert("doggo").unwrap(); + let field_1 = serde_json::to_string(&field_1).unwrap(); + obkv.insert(field_1_id, field_1.as_bytes()).unwrap(); + let field_2_id = fields_ids_map.insert("catto").unwrap(); + let field_2 = serde_json::to_string(&field_2).unwrap(); + obkv.insert(field_2_id, field_2.as_bytes()).unwrap(); + let field_3_id = fields_ids_map.insert("doggo.name").unwrap(); + let field_3 = serde_json::to_string(&field_3).unwrap(); + obkv.insert(field_3_id, field_3.as_bytes()).unwrap(); + let value = obkv.into_inner().unwrap(); + let obkv = KvReader::from_slice(value.as_slice()); + + fields_ids_map.insert("doggo.age"); + fields_ids_map.insert("catto.catto.name"); + fields_ids_map.insert("catto.catto.age"); + + let mut tb = TokenizerBuilder::default(); + let document_tokenizer = DocumentTokenizer { + tokenizer: &tb.build(), + searchable_attributes: None, + localized_attributes_rules: &[], + max_positions_per_attributes: 1000, + }; + + let mut words = std::collections::BTreeMap::new(); + document_tokenizer + .tokenize_document(obkv, &fields_ids_map, &mut |fid, pos, word| { + words.insert([fid, pos], word.to_string()); + }) + .unwrap(); + + snapshot!(format!("{:#?}", words), @r###" + { + [ + 2, + 0, + ]: "doggo", + [ + 2, + 8, + ]: "doggo", + [ + 2, + 16, + ]: "catto", + [ + 3, + 0, + ]: "10", + [ + 4, + 0, + ]: "pesti", + [ + 5, + 0, + ]: "23", + } + "###); + } +} diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index ca5bb71eb..ebbb8582c 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -15,11 +15,13 @@ use super::channel::{ WriterOperation, }; use super::document_change::DocumentChange; +use super::extract::{SearchableExtractor, WordDocidsExtractor}; use super::merger::merge_grenad_entries; use super::StdResult; use crate::documents::{ obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, PrimaryKey, DEFAULT_PRIMARY_KEY, }; +use crate::update::GrenadParameters; use crate::{Index, Result, UserError}; mod document_deletion; @@ -45,7 +47,7 @@ pub fn index( wtxn: &mut RwTxn, index: &Index, pool: &ThreadPool, - _document_changes: PI, + document_changes: PI, ) -> Result<()> where PI: IntoParallelIterator> + Send, @@ -59,10 +61,18 @@ where // TODO manage the errors correctly let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { pool.in_place_scope(|_s| { + let document_changes = document_changes.into_par_iter(); // word docids - // document_changes.into_par_iter().try_for_each(|_dc| Ok(()) as Result<_>) - // let grenads = extractor_function(document_changes)?; - // deladd_cbo_roaring_bitmap_sender.word_docids(grenads)?; + let merger = WordDocidsExtractor::run_extraction( + index, + todo!(), + /// TODO: GrenadParameters::default() should be removed in favor a passed parameter + GrenadParameters::default(), + document_changes.clone(), + )?; + + /// TODO: manage the errors correctly + deladd_cbo_roaring_bitmap_sender.word_docids(merger).unwrap(); Ok(()) as Result<_> }) diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 97f9e6ac6..89d0762f0 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -20,14 +20,12 @@ pub fn merge_grenad_entries( for merger_operation in receiver { match merger_operation { - MergerOperation::WordDocidsCursors(cursors) => { + MergerOperation::WordDocidsMerger(merger) => { let sender = sender.word_docids(); let database = index.word_docids.remap_types::(); - let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); - builder.extend(cursors); /// TODO manage the error correctly - let mut merger_iter = builder.build().into_stream_merger_iter().unwrap(); + let mut merger_iter = merger.into_stream_merger_iter().unwrap(); // TODO manage the error correctly while let Some((key, deladd)) = merger_iter.next().unwrap() { diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index ad61d8343..31a017c12 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -4,13 +4,12 @@ pub use items_pool::ItemsPool; use super::del_add::DelAdd; use crate::FieldId; -mod document_change; -mod merger; -// mod extract; mod channel; -//mod global_fields_ids_map; +mod document_change; +mod extract; pub mod indexer; mod items_pool; +mod merger; /// TODO move them elsewhere pub type StdResult = std::result::Result;