Expose an experimental parameter to control the generation of prefix dbs

This commit is contained in:
Clément Renault 2024-09-16 10:57:52 +02:00
parent 882663bf7f
commit 8cb7001755
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
8 changed files with 67 additions and 34 deletions

View File

@ -1288,7 +1288,11 @@ impl IndexScheduler {
} }
} }
let config = IndexDocumentsConfig { update_method: method, ..Default::default() }; let config = IndexDocumentsConfig {
update_method: method,
compute_prefix_databases: self.compute_prefix_databases,
..Default::default()
};
let embedder_configs = index.embedding_configs(index_wtxn)?; let embedder_configs = index.embedding_configs(index_wtxn)?;
// TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense) // TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense)
@ -1398,6 +1402,7 @@ impl IndexScheduler {
let deleted_documents = delete_document_by_filter( let deleted_documents = delete_document_by_filter(
index_wtxn, index_wtxn,
filter, filter,
self.compute_prefix_databases,
self.index_mapper.indexer_config(), self.index_mapper.indexer_config(),
self.must_stop_processing.clone(), self.must_stop_processing.clone(),
index, index,
@ -1638,6 +1643,7 @@ impl IndexScheduler {
fn delete_document_by_filter<'a>( fn delete_document_by_filter<'a>(
wtxn: &mut RwTxn<'a>, wtxn: &mut RwTxn<'a>,
filter: &serde_json::Value, filter: &serde_json::Value,
compute_prefix_databases: bool,
indexer_config: &IndexerConfig, indexer_config: &IndexerConfig,
must_stop_processing: MustStopProcessing, must_stop_processing: MustStopProcessing,
index: &'a Index, index: &'a Index,
@ -1653,6 +1659,7 @@ fn delete_document_by_filter<'a>(
let config = IndexDocumentsConfig { let config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments, update_method: IndexDocumentsMethod::ReplaceDocuments,
compute_prefix_databases,
..Default::default() ..Default::default()
}; };

View File

@ -32,6 +32,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
features: _, features: _,
max_number_of_tasks: _, max_number_of_tasks: _,
max_number_of_batched_tasks: _, max_number_of_batched_tasks: _,
compute_prefix_databases: _,
wake_up: _, wake_up: _,
dumps_path: _, dumps_path: _,
snapshots_path: _, snapshots_path: _,

View File

@ -276,6 +276,8 @@ pub struct IndexSchedulerOptions {
pub max_number_of_batched_tasks: usize, pub max_number_of_batched_tasks: usize,
/// The experimental features enabled for this instance. /// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures, pub instance_features: InstanceTogglableFeatures,
/// An experimental option to control the generation of prefix databases.
pub compute_prefix_databases: bool,
} }
/// Structure which holds meilisearch's indexes and schedules the tasks /// Structure which holds meilisearch's indexes and schedules the tasks
@ -283,19 +285,13 @@ pub struct IndexSchedulerOptions {
pub struct IndexScheduler { pub struct IndexScheduler {
/// The LMDB environment which the DBs are associated with. /// The LMDB environment which the DBs are associated with.
pub(crate) env: Env, pub(crate) env: Env,
/// A boolean that can be set to true to stop the currently processing tasks. /// A boolean that can be set to true to stop the currently processing tasks.
pub(crate) must_stop_processing: MustStopProcessing, pub(crate) must_stop_processing: MustStopProcessing,
/// The list of tasks currently processing /// The list of tasks currently processing
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>, pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
/// The list of files referenced by the tasks /// The list of files referenced by the tasks
pub(crate) file_store: FileStore, pub(crate) file_store: FileStore, // The main database, it contains all the tasks accessible by their Id.
// The main database, it contains all the tasks accessible by their Id.
pub(crate) all_tasks: Database<BEU32, SerdeJson<Task>>, pub(crate) all_tasks: Database<BEU32, SerdeJson<Task>>,
/// All the tasks ids grouped by their status. /// All the tasks ids grouped by their status.
// TODO we should not be able to serialize a `Status::Processing` in this database. // TODO we should not be able to serialize a `Status::Processing` in this database.
pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>, pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
@ -303,58 +299,43 @@ pub struct IndexScheduler {
pub(crate) kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>, pub(crate) kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>,
/// Store the tasks associated to an index. /// Store the tasks associated to an index.
pub(crate) index_tasks: Database<Str, RoaringBitmapCodec>, pub(crate) index_tasks: Database<Str, RoaringBitmapCodec>,
/// Store the tasks that were canceled by a task uid /// Store the tasks that were canceled by a task uid
pub(crate) canceled_by: Database<BEU32, RoaringBitmapCodec>, pub(crate) canceled_by: Database<BEU32, RoaringBitmapCodec>,
/// Store the task ids of tasks which were enqueued at a specific date /// Store the task ids of tasks which were enqueued at a specific date
pub(crate) enqueued_at: Database<BEI128, CboRoaringBitmapCodec>, pub(crate) enqueued_at: Database<BEI128, CboRoaringBitmapCodec>,
/// Store the task ids of finished tasks which started being processed at a specific date /// Store the task ids of finished tasks which started being processed at a specific date
pub(crate) started_at: Database<BEI128, CboRoaringBitmapCodec>, pub(crate) started_at: Database<BEI128, CboRoaringBitmapCodec>,
/// Store the task ids of tasks which finished at a specific date /// Store the task ids of tasks which finished at a specific date
pub(crate) finished_at: Database<BEI128, CboRoaringBitmapCodec>, pub(crate) finished_at: Database<BEI128, CboRoaringBitmapCodec>,
/// In charge of creating, opening, storing and returning indexes. /// In charge of creating, opening, storing and returning indexes.
pub(crate) index_mapper: IndexMapper, pub(crate) index_mapper: IndexMapper,
/// In charge of fetching and setting the status of experimental features. /// In charge of fetching and setting the status of experimental features.
features: features::FeatureData, features: features::FeatureData,
/// Get a signal when a batch needs to be processed. /// Get a signal when a batch needs to be processed.
pub(crate) wake_up: Arc<SignalEvent>, pub(crate) wake_up: Arc<SignalEvent>,
/// Whether auto-batching is enabled or not. /// Whether auto-batching is enabled or not.
pub(crate) autobatching_enabled: bool, pub(crate) autobatching_enabled: bool,
/// Whether we should automatically cleanup the task queue or not. /// Whether we should automatically cleanup the task queue or not.
pub(crate) cleanup_enabled: bool, pub(crate) cleanup_enabled: bool,
/// The max number of tasks allowed before the scheduler starts to delete /// The max number of tasks allowed before the scheduler starts to delete
/// the finished tasks automatically. /// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize, pub(crate) max_number_of_tasks: usize,
/// The maximum number of tasks that will be batched together. /// The maximum number of tasks that will be batched together.
pub(crate) max_number_of_batched_tasks: usize, pub(crate) max_number_of_batched_tasks: usize,
/// Control wether we must generate the prefix databases or not.
pub(crate) compute_prefix_databases: bool,
/// The webhook url we should send tasks to after processing every batches. /// The webhook url we should send tasks to after processing every batches.
pub(crate) webhook_url: Option<String>, pub(crate) webhook_url: Option<String>,
/// The Authorization header to send to the webhook URL. /// The Authorization header to send to the webhook URL.
pub(crate) webhook_authorization_header: Option<String>, pub(crate) webhook_authorization_header: Option<String>,
/// The path used to create the dumps. /// The path used to create the dumps.
pub(crate) dumps_path: PathBuf, pub(crate) dumps_path: PathBuf,
/// The path used to create the snapshots. /// The path used to create the snapshots.
pub(crate) snapshots_path: PathBuf, pub(crate) snapshots_path: PathBuf,
/// The path to the folder containing the auth LMDB env. /// The path to the folder containing the auth LMDB env.
pub(crate) auth_path: PathBuf, pub(crate) auth_path: PathBuf,
/// The path to the version file of Meilisearch. /// The path to the version file of Meilisearch.
pub(crate) version_file_path: PathBuf, pub(crate) version_file_path: PathBuf,
embedders: Arc<RwLock<HashMap<EmbedderOptions, Arc<Embedder>>>>, embedders: Arc<RwLock<HashMap<EmbedderOptions, Arc<Embedder>>>>,
// ================= test // ================= test
@ -364,13 +345,11 @@ pub struct IndexScheduler {
/// See [self.breakpoint()](`IndexScheduler::breakpoint`) for an explanation. /// See [self.breakpoint()](`IndexScheduler::breakpoint`) for an explanation.
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
/// A list of planned failures within the [`tick`](IndexScheduler::tick) method of the index scheduler. /// A list of planned failures within the [`tick`](IndexScheduler::tick) method of the index scheduler.
/// ///
/// The first field is the iteration index and the second field identifies a location in the code. /// The first field is the iteration index and the second field identifies a location in the code.
#[cfg(test)] #[cfg(test)]
planned_failures: Vec<(usize, tests::FailureLocation)>, planned_failures: Vec<(usize, tests::FailureLocation)>,
/// A counter that is incremented before every call to [`tick`](IndexScheduler::tick) /// A counter that is incremented before every call to [`tick`](IndexScheduler::tick)
#[cfg(test)] #[cfg(test)]
run_loop_iteration: Arc<RwLock<usize>>, run_loop_iteration: Arc<RwLock<usize>>,
@ -397,6 +376,7 @@ impl IndexScheduler {
cleanup_enabled: self.cleanup_enabled, cleanup_enabled: self.cleanup_enabled,
max_number_of_tasks: self.max_number_of_tasks, max_number_of_tasks: self.max_number_of_tasks,
max_number_of_batched_tasks: self.max_number_of_batched_tasks, max_number_of_batched_tasks: self.max_number_of_batched_tasks,
compute_prefix_databases: self.compute_prefix_databases,
snapshots_path: self.snapshots_path.clone(), snapshots_path: self.snapshots_path.clone(),
dumps_path: self.dumps_path.clone(), dumps_path: self.dumps_path.clone(),
auth_path: self.auth_path.clone(), auth_path: self.auth_path.clone(),
@ -499,6 +479,7 @@ impl IndexScheduler {
cleanup_enabled: options.cleanup_enabled, cleanup_enabled: options.cleanup_enabled,
max_number_of_tasks: options.max_number_of_tasks, max_number_of_tasks: options.max_number_of_tasks,
max_number_of_batched_tasks: options.max_number_of_batched_tasks, max_number_of_batched_tasks: options.max_number_of_batched_tasks,
compute_prefix_databases: options.compute_prefix_databases,
dumps_path: options.dumps_path, dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path, snapshots_path: options.snapshots_path,
auth_path: options.auth_path, auth_path: options.auth_path,
@ -1819,6 +1800,7 @@ mod tests {
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: usize::MAX, max_number_of_batched_tasks: usize::MAX,
instance_features: Default::default(), instance_features: Default::default(),
compute_prefix_databases: true,
}; };
configuration(&mut options); configuration(&mut options);

View File

@ -256,6 +256,7 @@ struct Infos {
experimental_enable_logs_route: bool, experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool, experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize, experimental_max_number_of_batched_tasks: usize,
experimental_disable_prefix_db: bool,
gpu_enabled: bool, gpu_enabled: bool,
db_path: bool, db_path: bool,
import_dump: bool, import_dump: bool,
@ -298,6 +299,7 @@ impl From<Opt> for Infos {
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
experimental_disable_prefix_db,
http_addr, http_addr,
master_key: _, master_key: _,
env, env,
@ -347,6 +349,7 @@ impl From<Opt> for Infos {
experimental_replication_parameters, experimental_replication_parameters,
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_disable_prefix_db,
gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(), gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(),
db_path: db_path != PathBuf::from("./data.ms"), db_path: db_path != PathBuf::from("./data.ms"),
import_dump: import_dump.is_some(), import_dump: import_dump.is_some(),

View File

@ -311,6 +311,7 @@ fn open_or_create_database_unchecked(
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
instance_features, instance_features,
compute_prefix_databases: !opt.experimental_disable_prefix_db,
})?) })?)
}; };

View File

@ -60,6 +60,7 @@ const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
"MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE"; "MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE";
const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str = const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str =
"MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS"; "MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS";
const MEILI_EXPERIMENTAL_DISABLE_PREFIX_DB: &str = "MEILI_EXPERIMENTAL_DISABLE_PREFIXDB";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms"; const DEFAULT_DB_PATH: &str = "./data.ms";
@ -389,6 +390,11 @@ pub struct Opt {
#[serde(default = "default_limit_batched_tasks")] #[serde(default = "default_limit_batched_tasks")]
pub experimental_max_number_of_batched_tasks: usize, pub experimental_max_number_of_batched_tasks: usize,
/// Experimentally disable the prefix database, see: <https://github.com/orgs/meilisearch/discussions>
#[clap(long, env = MEILI_EXPERIMENTAL_DISABLE_PREFIX_DB)]
#[serde(default)]
pub experimental_disable_prefix_db: bool,
#[serde(flatten)] #[serde(flatten)]
#[clap(flatten)] #[clap(flatten)]
pub indexer_options: IndexerOpts, pub indexer_options: IndexerOpts,
@ -489,6 +495,7 @@ impl Opt {
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_replication_parameters, experimental_replication_parameters,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_disable_prefix_db,
} = self; } = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -518,6 +525,10 @@ impl Opt {
MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS,
experimental_max_number_of_batched_tasks.to_string(), experimental_max_number_of_batched_tasks.to_string(),
); );
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_DISABLE_PREFIX_DB,
experimental_disable_prefix_db.to_string(),
);
if let Some(ssl_cert_path) = ssl_cert_path { if let Some(ssl_cert_path) = ssl_cert_path {
export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path); export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path);
} }

View File

@ -1230,6 +1230,11 @@ impl Index {
) )
} }
/// Deletes the FST which is the words prefixes dictionary of the engine.
pub fn delete_words_prefixes_fst(&self, wtxn: &mut RwTxn) -> heed::Result<bool> {
self.main.remap_key_type::<Str>().delete(wtxn, main_key::WORDS_PREFIXES_FST_KEY)
}
/// Returns the FST which is the words prefixes dictionary of the engine. /// Returns the FST which is the words prefixes dictionary of the engine.
pub fn words_prefixes_fst<'t>(&self, rtxn: &'t RoTxn) -> Result<fst::Set<Cow<'t, [u8]>>> { pub fn words_prefixes_fst<'t>(&self, rtxn: &'t RoTxn) -> Result<fst::Set<Cow<'t, [u8]>>> {
match self.main.remap_types::<Str, Bytes>().get(rtxn, main_key::WORDS_PREFIXES_FST_KEY)? { match self.main.remap_types::<Str, Bytes>().get(rtxn, main_key::WORDS_PREFIXES_FST_KEY)? {

View File

@ -85,7 +85,7 @@ pub struct IndexDocuments<'t, 'i, 'a, FP, FA> {
embedders: EmbeddingConfigs, embedders: EmbeddingConfigs,
} }
#[derive(Default, Debug, Clone)] #[derive(Debug, Clone)]
pub struct IndexDocumentsConfig { pub struct IndexDocumentsConfig {
pub words_prefix_threshold: Option<u32>, pub words_prefix_threshold: Option<u32>,
pub max_prefix_length: Option<usize>, pub max_prefix_length: Option<usize>,
@ -93,6 +93,21 @@ pub struct IndexDocumentsConfig {
pub words_positions_min_level_size: Option<NonZeroU32>, pub words_positions_min_level_size: Option<NonZeroU32>,
pub update_method: IndexDocumentsMethod, pub update_method: IndexDocumentsMethod,
pub autogenerate_docids: bool, pub autogenerate_docids: bool,
pub compute_prefix_databases: bool,
}
impl Default for IndexDocumentsConfig {
fn default() -> Self {
Self {
words_prefix_threshold: Default::default(),
max_prefix_length: Default::default(),
words_positions_level_group_size: Default::default(),
words_positions_min_level_size: Default::default(),
update_method: Default::default(),
autogenerate_docids: Default::default(),
compute_prefix_databases: true,
}
}
} }
impl<'t, 'i, 'a, FP, FA> IndexDocuments<'t, 'i, 'a, FP, FA> impl<'t, 'i, 'a, FP, FA> IndexDocuments<'t, 'i, 'a, FP, FA>
@ -558,12 +573,20 @@ where
.map_err(InternalError::from)??; .map_err(InternalError::from)??;
} }
self.execute_prefix_databases( if self.config.compute_prefix_databases {
word_docids.map(MergerBuilder::build), self.execute_prefix_databases(
exact_word_docids.map(MergerBuilder::build), word_docids.map(MergerBuilder::build),
word_position_docids.map(MergerBuilder::build), exact_word_docids.map(MergerBuilder::build),
word_fid_docids.map(MergerBuilder::build), word_position_docids.map(MergerBuilder::build),
)?; word_fid_docids.map(MergerBuilder::build),
)?;
} else {
self.index.words_prefixes_fst(self.wtxn)?;
self.index.word_prefix_docids.clear(self.wtxn)?;
self.index.exact_word_prefix_docids.clear(self.wtxn)?;
self.index.word_prefix_position_docids.clear(self.wtxn)?;
self.index.word_prefix_fid_docids.clear(self.wtxn)?;
}
Ok(number_of_documents) Ok(number_of_documents)
} }