Use the GlobalFieldsIdsMap everywhere and write it to disk

Co-authored-by: Dureuill <louis@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
This commit is contained in:
Clément Renault 2024-09-03 12:01:01 +02:00
parent c50d3edc4a
commit c1557734dc
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
11 changed files with 70 additions and 99 deletions

View File

@ -36,7 +36,7 @@ use meilisearch_types::milli::update::{
use meilisearch_types::milli::vector::parsed_vectors::{
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
};
use meilisearch_types::milli::{self, Filter, Object, UserError};
use meilisearch_types::milli::{self, Filter, GlobalFieldsIdsMap, Object, UserError};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
@ -1302,49 +1302,6 @@ impl IndexScheduler {
let primary_key =
guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap();
// if let Some(primary_key) = primary_key {
// match index.primary_key(index_wtxn)? {
// // if a primary key was set AND had already been defined in the index
// // but to a different value, we can make the whole batch fail.
// Some(pk) => {
// if primary_key != pk {
// return Err(milli::Error::from(
// milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()),
// )
// .into());
// }
// }
// // if the primary key was set and there was no primary key set for this index
// // we set it to the received value before starting the indexing process.
// None => {
// todo!();
// let mut builder =
// milli::update::Settings::new(index_wtxn, index, indexer_config);
// builder.set_primary_key(primary_key);
// builder.execute(
// |indexing_step| tracing::debug!(update = ?indexing_step),
// || must_stop_processing.clone().get(),
// )?;
// primary_key_has_been_set = true;
// }
// }
// }
// let config = IndexDocumentsConfig { update_method: method, ..Default::default() };
// let embedder_configs = index.embedding_configs(index_wtxn)?;
// // TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense)
// let embedders = self.embedders(embedder_configs)?;
// let mut builder = milli::update::IndexDocuments::new(
// index_wtxn,
// index,
// indexer_config,
// config,
// |indexing_step| tracing::trace!(?indexing_step, "Update"),
// || must_stop_processing.get(),
// )?;
let mut indexer = indexer::DocumentOperation::new(method);
for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
match operation {
@ -1401,12 +1358,10 @@ impl IndexScheduler {
// let pool = indexer_config.thread_pool.unwrap();
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
// let fields_ids_map = RwLock::new(fields_ids_map);
let param = (index, &rtxn, &mut fields_ids_map, &primary_key);
let document_changes = indexer.document_changes(param)?;
indexer::index(index_wtxn, index, &pool, document_changes)?;
/// TODO we must store it or not?
let fields_ids_map = fields_ids_map;
let param = (index, &rtxn, &primary_key);
let document_changes = indexer.document_changes(&mut fields_ids_map, param)?;
/// TODO pass/write the FieldsIdsMap
indexer::index(index_wtxn, index, fields_ids_map, &pool, document_changes)?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} else if primary_key_has_been_set {

View File

@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use crate::FieldId;
mod global;
pub use global::GlobalFieldsIdsMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FieldsIdsMap {

View File

@ -4,11 +4,13 @@ use std::sync::RwLock;
use crate::{FieldId, FieldsIdsMap};
/// A fields ids map that can be globally updated to add fields
#[derive(Debug, Clone)]
pub struct GlobalFieldsIdsMap<'indexing> {
global: &'indexing RwLock<FieldsIdsMap>,
local: LocalFieldsIdsMap,
}
#[derive(Debug, Clone)]
struct LocalFieldsIdsMap {
names_ids: BTreeMap<String, FieldId>,
ids_names: BTreeMap<FieldId, String>,

View File

@ -55,7 +55,7 @@ pub use self::error::{
};
pub use self::external_documents_ids::ExternalDocumentsIds;
pub use self::fieldids_weights_map::FieldidsWeightsMap;
pub use self::fields_ids_map::FieldsIdsMap;
pub use self::fields_ids_map::{FieldsIdsMap, GlobalFieldsIdsMap};
pub use self::heed_codec::{
BEU16StrCodec, BEU32StrCodec, BoRoaringBitmapCodec, BoRoaringBitmapLenCodec,
CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, FieldIdWordCountCodec, ObkvCodec,

View File

@ -1,32 +1,20 @@
use std::fs::File;
use charabia::TokenizerBuilder;
use grenad::Merger;
use grenad::ReaderCursor;
use grenad::{Merger, ReaderCursor};
use heed::RoTxn;
use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelBridge;
use rayon::iter::ParallelIterator;
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{
update::{
create_sorter,
new::{DocumentChange, ItemsPool},
GrenadParameters,
},
FieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE,
};
use super::{
cache::{CachedSorter, DelAddRoaringBitmapMerger},
tokenize_document::DocumentTokenizer,
};
use super::cache::CachedSorter;
use super::tokenize_document::DocumentTokenizer;
use crate::update::new::{DocumentChange, ItemsPool};
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
pub trait SearchableExtractor {
fn run_extraction(
index: &Index,
fields_ids_map: &FieldsIdsMap,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
@ -62,12 +50,13 @@ pub trait SearchableExtractor {
Ok((
index.read_txn()?,
&document_tokenizer,
fields_ids_map.clone(),
CachedSorter::new(
// TODO use a better value
100.try_into().unwrap(),
create_sorter(
grenad::SortAlgorithm::Stable,
DelAddRoaringBitmapMerger,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
@ -78,12 +67,12 @@ pub trait SearchableExtractor {
});
document_changes.into_par_iter().try_for_each(|document_change| {
context_pool.with(|(rtxn, document_tokenizer, cached_sorter)| {
context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| {
Self::extract_document_change(
&*rtxn,
index,
document_tokenizer,
&fields_ids_map,
fields_ids_map,
cached_sorter,
document_change?,
)
@ -91,7 +80,7 @@ pub trait SearchableExtractor {
})?;
let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps);
for (_rtxn, _tokenizer, cache) in context_pool.into_items() {
for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() {
let sorter = cache.into_sorter()?;
let readers = sorter.into_reader_cursors()?;
builder.extend(readers);
@ -104,8 +93,8 @@ pub trait SearchableExtractor {
rtxn: &RoTxn,
index: &Index,
document_tokenizer: &DocumentTokenizer,
fields_ids_map: &FieldsIdsMap,
cached_sorter: &mut CachedSorter<DelAddRoaringBitmapMerger>,
fields_ids_map: &mut GlobalFieldsIdsMap,
cached_sorter: &mut CachedSorter<MergeDeladdCboRoaringBitmaps>,
document_change: DocumentChange,
) -> Result<()>;
}
@ -116,9 +105,8 @@ impl SearchableExtractor for WordDocidsExtractor {
rtxn: &RoTxn,
index: &Index,
document_tokenizer: &DocumentTokenizer,
fields_ids_map: &FieldsIdsMap,
// TODO: DelAddRoaringBitmapMerger should be CBO
cached_sorter: &mut CachedSorter<DelAddRoaringBitmapMerger>,
fields_ids_map: &mut GlobalFieldsIdsMap,
cached_sorter: &mut CachedSorter<MergeDeladdCboRoaringBitmaps>,
document_change: DocumentChange,
) -> crate::Result<()> {
match document_change {

View File

@ -1,11 +1,14 @@
use crate::{
update::new::KvReaderFieldId, FieldId, FieldsIdsMap, Index, InternalError,
LocalizedAttributesRule, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH,
};
use std::collections::HashMap;
use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
use heed::RoTxn;
use serde_json::Value;
use std::collections::HashMap;
use crate::update::new::KvReaderFieldId;
use crate::{
FieldId, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, LocalizedAttributesRule,
Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH,
};
pub struct DocumentTokenizer<'a> {
pub tokenizer: &'a Tokenizer<'a>,
@ -18,18 +21,24 @@ impl<'a> DocumentTokenizer<'a> {
pub fn tokenize_document(
&self,
obkv: &KvReaderFieldId,
field_id_map: &FieldsIdsMap,
field_id_map: &mut GlobalFieldsIdsMap,
token_fn: &mut impl FnMut(FieldId, u16, &str),
) -> Result<()> {
let mut field_position = HashMap::new();
let mut field_name = String::new();
for (field_id, field_bytes) in obkv {
let Some(field_name) = field_id_map.name(field_id) else {
let Some(field_name) = field_id_map.name(field_id).map(|s| {
field_name.clear();
field_name.push_str(s);
&field_name
}) else {
unreachable!("field id not found in field id map");
};
let mut tokenize_field = |name: &str, value: &Value| {
let Some(field_id) = field_id_map.id(name) else {
unreachable!("field name not found in field id map");
let Some(field_id) = field_id_map.id_or_insert(name) else {
/// TODO: better error
panic!("it's over 9000");
};
let position =
@ -75,7 +84,7 @@ impl<'a> DocumentTokenizer<'a> {
// if the current field is searchable or contains a searchable attribute
if self.searchable_attributes.map_or(true, |attributes| {
attributes.iter().any(|name| perm_json_p::contained_in(name, field_name))
attributes.iter().any(|name| perm_json_p::contained_in(name, &field_name))
}) {
// parse json.
match serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)? {
@ -224,11 +233,12 @@ mod perm_json_p {
#[cfg(test)]
mod test {
use super::*;
use charabia::TokenizerBuilder;
use meili_snap::snapshot;
use obkv::KvReader;
use serde_json::json;
use super::*;
#[test]
fn test_tokenize_document() {
let mut fields_ids_map = FieldsIdsMap::new();

View File

@ -27,6 +27,7 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion {
fn document_changes(
self,
_fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> {
let (index, fields, primary_key) = param;

View File

@ -73,13 +73,14 @@ impl DocumentOperation {
}
impl<'p> DocumentChanges<'p> for DocumentOperation {
type Parameter = (&'p Index, &'p RoTxn<'p>, &'p mut FieldsIdsMap, &'p PrimaryKey<'p>);
type Parameter = (&'p Index, &'p RoTxn<'p>, &'p PrimaryKey<'p>);
fn document_changes(
self,
fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> {
let (index, rtxn, fields_ids_map, primary_key) = param;
let (index, rtxn, primary_key) = param;
let documents_ids = index.documents_ids(rtxn)?;
let mut available_docids = AvailableIds::new(&documents_ids);
@ -174,7 +175,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation {
/// TODO is it the best way to provide FieldsIdsMap to the parallel iterator?
let fields_ids_map = fields_ids_map.clone();
// We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
let docids_version_offsets: Vec<_> = docids_version_offsets.drain().collect();
Ok(docids_version_offsets

View File

@ -1,4 +1,5 @@
use std::fs::File;
use std::sync::RwLock;
use std::thread::{self, Builder};
use big_s::S;
@ -22,7 +23,7 @@ use crate::documents::{
obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, PrimaryKey, DEFAULT_PRIMARY_KEY,
};
use crate::update::GrenadParameters;
use crate::{Index, Result, UserError};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
mod document_deletion;
mod document_operation;
@ -34,6 +35,7 @@ pub trait DocumentChanges<'p> {
fn document_changes(
self,
fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + Clone + 'p>;
}
@ -46,6 +48,7 @@ pub trait DocumentChanges<'p> {
pub fn index<PI>(
wtxn: &mut RwTxn,
index: &Index,
fields_ids_map: FieldsIdsMap,
pool: &ThreadPool,
document_changes: PI,
) -> Result<()>
@ -57,6 +60,9 @@ where
let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } =
extractors_merger_channels(100);
let fields_ids_map_lock = RwLock::new(fields_ids_map);
let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock);
thread::scope(|s| {
// TODO manage the errors correctly
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || {
@ -65,7 +71,7 @@ where
// word docids
let merger = WordDocidsExtractor::run_extraction(
index,
todo!(),
&global_fields_ids_map,
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
GrenadParameters::default(),
document_changes.clone(),
@ -100,8 +106,13 @@ where
handle.join().unwrap()?;
handle2.join().unwrap()?;
Ok(())
})
Ok(()) as Result<_>
})?;
let fields_ids_map = fields_ids_map_lock.into_inner().unwrap();
index.put_fields_ids_map(wtxn, &fields_ids_map)?;
Ok(())
}
/// TODO move this elsewhere

View File

@ -30,6 +30,7 @@ where
/// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items).
fn document_changes(
self,
_fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> {
let (fields_ids_map, concurrent_available_ids, primary_key) = param;

View File

@ -2,7 +2,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
use super::DocumentChanges;
use crate::update::new::DocumentChange;
use crate::Result;
use crate::{FieldsIdsMap, Result};
pub struct UpdateByFunction;
@ -11,6 +11,7 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction {
fn document_changes(
self,
_fields_ids_map: &mut FieldsIdsMap,
_param: Self::Parameter,
) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> {
Ok((0..100).into_par_iter().map(|_| todo!()))