From 55e15529577d2b04244d554188112fab3d85d1d2 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 22 Dec 2020 17:13:50 +0100 Subject: [PATCH] update queue refactor, first iteration --- Cargo.lock | 5 + Cargo.toml | 8 +- src/option.rs | 7 +- src/updates/mod.rs | 336 +++++++++++++++++++++++++++++++++++++++- src/updates/settings.rs | 12 +- 5 files changed, 357 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b7386594..610c83318 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1142,6 +1142,7 @@ dependencies = [ "lmdb-rkv-sys", "once_cell", "page_size", + "serde", "synchronoise", "url", "zerocopy", @@ -1527,6 +1528,7 @@ dependencies = [ "actix-rt", "actix-service", "actix-web", + "anyhow", "assert-json-diff", "byte-unit", "bytes 0.6.0", @@ -1535,6 +1537,8 @@ dependencies = [ "env_logger 0.8.2", "flate2", "futures", + "grenad", + "heed", "http", "indexmap", "jemallocator", @@ -1545,6 +1549,7 @@ dependencies = [ "mime", "once_cell", "rand 0.7.3", + "rayon", "regex", "rustls 0.18.1", "sentry", diff --git a/Cargo.toml b/Cargo.toml index 150005d66..cc25f0c8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,21 +18,26 @@ actix-http = "2" actix-rt = "1" actix-service = "1.0.6" actix-web = { version = "3.3.2", features = ["rustls"] } +anyhow = "1.0.36" byte-unit = { version = "4.0.9", default-features = false, features = ["std"] } bytes = "0.6.0" chrono = { version = "0.4.19", features = ["serde"] } crossbeam-channel = "0.5.0" env_logger = "0.8.2" -flate2 = "1.0.18" +flate2 = "1.0.19" futures = "0.3.7" +grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" } +heed = "0.10.6" http = "0.2.1" indexmap = { version = "1.3.2", features = ["serde-1"] } log = "0.4.8" main_error = "0.1.0" +meilisearch-error = { path = "../MeiliSearch/meilisearch-error" } milli = { path = "../milli" } mime = "0.3.16" once_cell = "1.5.2" rand = "0.7.3" +rayon = "1.5.0" regex = "1.4.2" rustls = "0.18" serde = { version = "1.0", features = ["derive"] } @@ -48,7 +53,6 @@ tokio = "*" ureq = { version = "1.5.1", default-features = false, features = ["tls"] } walkdir = "2.3.1" whoami = "1.0.0" -meilisearch-error = { path = "../MeiliSearch/meilisearch-error" } [dependencies.sentry] default-features = false diff --git a/src/option.rs b/src/option.rs index cf505e1fe..f9e98f4fa 100644 --- a/src/option.rs +++ b/src/option.rs @@ -11,13 +11,15 @@ use rustls::{ }; use structopt::StructOpt; +use crate::updates::IndexerOpts; + const POSSIBLE_ENV: [&str; 2] = ["development", "production"]; #[derive(Debug, Clone, StructOpt)] pub struct Opt { /// The destination where the database must be created. #[structopt(long, env = "MEILI_DB_PATH", default_value = "./data.ms")] - pub db_path: String, + pub db_path: PathBuf, /// The address on which the http server will listen. #[structopt(long, env = "MEILI_HTTP_ADDR", default_value = "127.0.0.1:7700")] @@ -132,6 +134,9 @@ pub struct Opt { /// The batch size used in the importation process, the bigger it is the faster the dump is created. #[structopt(long, env = "MEILI_DUMP_BATCH_SIZE", default_value = "1024")] pub dump_batch_size: usize, + + #[structopt(flatten)] + pub indexer_options: IndexerOpts, } impl Opt { diff --git a/src/updates/mod.rs b/src/updates/mod.rs index 249bec6d2..dd76ed1aa 100644 --- a/src/updates/mod.rs +++ b/src/updates/mod.rs @@ -2,7 +2,22 @@ mod settings; pub use settings::{Settings, Facets}; +use std::io; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Result; +use flate2::read::GzDecoder; +use grenad::CompressionType; +use byte_unit::Byte; +use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod, UpdateIndexingStep::*}; +use milli::{UpdateStore, UpdateHandler as Handler, Index}; +use rayon::ThreadPool; use serde::{Serialize, Deserialize}; +use tokio::sync::broadcast; +use structopt::StructOpt; + +use crate::option::Opt; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] @@ -13,5 +28,322 @@ enum UpdateMeta { Facets(Facets), } -#[derive(Clone, Debug)] -pub struct UpdateQueue; +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +enum UpdateMetaProgress { + DocumentsAddition { + step: usize, + total_steps: usize, + current: usize, + total: Option, + }, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type")] +enum UpdateStatus { + Pending { update_id: u64, meta: M }, + Progressing { update_id: u64, meta: P }, + Processed { update_id: u64, meta: N }, + Aborted { update_id: u64, meta: M }, +} + +#[derive(Clone)] +pub struct UpdateQueue { + inner: Arc>, +} + + +#[derive(Debug, Clone, StructOpt)] +pub struct IndexerOpts { + /// The amount of documents to skip before printing + /// a log regarding the indexing advancement. + #[structopt(long, default_value = "100000")] // 100k + pub log_every_n: usize, + + /// MTBL max number of chunks in bytes. + #[structopt(long)] + pub max_nb_chunks: Option, + + /// The maximum amount of memory to use for the MTBL buffer. It is recommended + /// to use something like 80%-90% of the available memory. + /// + /// It is automatically split by the number of jobs e.g. if you use 7 jobs + /// and 7 GB of max memory, each thread will use a maximum of 1 GB. + #[structopt(long, default_value = "7 GiB")] + pub max_memory: Byte, + + /// Size of the linked hash map cache when indexing. + /// The bigger it is, the faster the indexing is but the more memory it takes. + #[structopt(long, default_value = "500")] + pub linked_hash_map_size: usize, + + /// The name of the compression algorithm to use when compressing intermediate + /// chunks during indexing documents. + /// + /// Choosing a fast algorithm will make the indexing faster but may consume more memory. + #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] + pub chunk_compression_type: CompressionType, + + /// The level of compression of the chosen algorithm. + #[structopt(long, requires = "chunk-compression-type")] + pub chunk_compression_level: Option, + + /// The number of bytes to remove from the begining of the chunks while reading/sorting + /// or merging them. + /// + /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, + /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. + #[structopt(long, default_value = "4 GiB")] + pub chunk_fusing_shrink_size: Byte, + + /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. + #[structopt(long)] + pub enable_chunk_fusing: bool, + + /// Number of parallel jobs for indexing, defaults to # of CPUs. + #[structopt(long)] + pub indexing_jobs: Option, +} + +type UpdateSender = broadcast::Sender>; + +struct UpdateHandler { + indexes: Arc, + max_nb_chunks: Option, + chunk_compression_level: Option, + thread_pool: ThreadPool, + log_frequency: usize, + max_memory: usize, + linked_hash_map_size: usize, + chunk_compression_type: CompressionType, + chunk_fusing_shrink_size: u64, + update_status_sender: UpdateSender, +} + +impl UpdateHandler { + fn new( + opt: &IndexerOpts, + indexes: Arc, + update_status_sender: UpdateSender, + ) -> Result { + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(opt.indexing_jobs.unwrap_or(0)) + .build()?; + Ok(Self { + indexes, + max_nb_chunks: opt.max_nb_chunks, + chunk_compression_level: opt.chunk_compression_level, + thread_pool, + log_frequency: opt.log_every_n, + max_memory: opt.max_memory.get_bytes() as usize, + linked_hash_map_size: opt.linked_hash_map_size, + chunk_compression_type: opt.chunk_compression_type, + chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(), + update_status_sender, + }) + } + + fn update_buidler(&self, update_id: u64) -> UpdateBuilder { + // We prepare the update by using the update builder. + let mut update_builder = UpdateBuilder::new(update_id); + if let Some(max_nb_chunks) = self.max_nb_chunks { + update_builder.max_nb_chunks(max_nb_chunks); + } + if let Some(chunk_compression_level) = self.chunk_compression_level { + update_builder.chunk_compression_level(chunk_compression_level); + } + update_builder.thread_pool(&self.thread_pool); + update_builder.log_every_n(self.log_frequency); + update_builder.max_memory(self.max_memory); + update_builder.linked_hash_map_size(self.linked_hash_map_size); + update_builder.chunk_compression_type(self.chunk_compression_type); + update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size); + update_builder + } + + fn update_documents( + &self, + format: String, + method: String, + content: &[u8], + update_builder: UpdateBuilder, + ) -> Result<()> { + // We must use the write transaction of the update here. + let mut wtxn = self.indexes.write_txn()?; + let mut builder = update_builder.index_documents(&mut wtxn, &self.indexes); + + match format.as_str() { + "csv" => builder.update_format(UpdateFormat::Csv), + "json" => builder.update_format(UpdateFormat::Json), + "json-stream" => builder.update_format(UpdateFormat::JsonStream), + otherwise => panic!("invalid update format {:?}", otherwise), + }; + + match method.as_str() { + "replace" => builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments), + "update" => builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments), + otherwise => panic!("invalid indexing method {:?}", otherwise), + }; + + let gzipped = true; + let reader = if gzipped { + Box::new(GzDecoder::new(content)) + } else { + Box::new(content) as Box + }; + + let result = builder.execute(reader, |indexing_step, update_id| { + let (current, total) = match indexing_step { + TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), + ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), + IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), + MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)), + }; + let _ = self.update_status_sender.send(UpdateStatus::Progressing { + update_id, + meta: UpdateMetaProgress::DocumentsAddition { + step: indexing_step.step(), + total_steps: indexing_step.number_of_steps(), + current, + total, + } + }); + }); + + match result { + Ok(()) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()) + } + } + + fn clear_documents(&self, update_builder: UpdateBuilder) -> Result<()> { + // We must use the write transaction of the update here. + let mut wtxn = self.indexes.write_txn()?; + let builder = update_builder.clear_documents(&mut wtxn, &self.indexes); + + match builder.execute() { + Ok(_count) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()) + } + } + + fn update_settings(&self, settings: Settings, update_builder: UpdateBuilder) -> Result<()> { + // We must use the write transaction of the update here. + let mut wtxn = self.indexes.write_txn()?; + let mut builder = update_builder.settings(&mut wtxn, &self.indexes); + + // We transpose the settings JSON struct into a real setting update. + if let Some(names) = settings.searchable_attributes { + match names { + Some(names) => builder.set_searchable_fields(names), + None => builder.reset_searchable_fields(), + } + } + + // We transpose the settings JSON struct into a real setting update. + if let Some(names) = settings.displayed_attributes { + match names { + Some(names) => builder.set_displayed_fields(names), + None => builder.reset_displayed_fields(), + } + } + + // We transpose the settings JSON struct into a real setting update. + if let Some(facet_types) = settings.faceted_attributes { + builder.set_faceted_fields(facet_types); + } + + // We transpose the settings JSON struct into a real setting update. + if let Some(criteria) = settings.criteria { + match criteria { + Some(criteria) => builder.set_criteria(criteria), + None => builder.reset_criteria(), + } + } + + let result = builder.execute(|indexing_step, update_id| { + let (current, total) = match indexing_step { + TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), + ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), + IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), + MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)), + }; + let _ = self.update_status_sender.send(UpdateStatus::Progressing { + update_id, + meta: UpdateMetaProgress::DocumentsAddition { + step: indexing_step.step(), + total_steps: indexing_step.number_of_steps(), + current, + total, + } + }); + }); + + match result { + Ok(_count) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()) + } + } + + fn update_facets(&self, levels: Facets, update_builder: UpdateBuilder) -> Result<()> { + // We must use the write transaction of the update here. + let mut wtxn = self.indexes.write_txn()?; + let mut builder = update_builder.facets(&mut wtxn, &self.indexes); + if let Some(value) = levels.level_group_size { + builder.level_group_size(value); + } + if let Some(value) = levels.min_level_size { + builder.min_level_size(value); + } + match builder.execute() { + Ok(()) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()) + } + } +} + +impl Handler for UpdateHandler { + fn handle_update(&mut self, update_id: u64, meta: UpdateMeta, content: &[u8]) -> heed::Result { + use UpdateMeta::*; + + let update_builder = self.update_buidler(update_id); + + let result: anyhow::Result<()> = match meta { + DocumentsAddition { method, format } => { + self.update_documents(format, method, content, update_builder) + }, + ClearDocuments => self.clear_documents(update_builder), + Settings(settings) => self.update_settings(settings, update_builder), + Facets(levels) => self.update_facets(levels, update_builder), + }; + + let meta = match result { + Ok(()) => format!("valid update content"), + Err(e) => format!("error while processing update content: {:?}", e), + }; + + let processed = UpdateStatus::Processed { update_id, meta: meta.clone() }; + let _ = self.update_status_sender.send(processed); + + Ok(meta) + } +} + +impl UpdateQueue { + pub fn new>( + opt: Opt, + indexes: Arc, + ) -> Result { + let (sender, _) = broadcast::channel(100); + let handler = UpdateHandler::new(&opt.indexer_options, indexes, sender)?; + let size = opt.max_udb_size.get_bytes() as usize; + let path = opt.db_path.join("updates.mdb"); + let inner = UpdateStore::open( + Some(size), + path, + handler + )?; + Ok(Self { inner }) + } +} diff --git a/src/updates/settings.rs b/src/updates/settings.rs index 55c0cccf2..58414152a 100644 --- a/src/updates/settings.rs +++ b/src/updates/settings.rs @@ -20,24 +20,24 @@ pub struct Settings { deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none", )] - displayed_attributes: Option>>, + pub displayed_attributes: Option>>, #[serde( default, deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none", )] - searchable_attributes: Option>>, + pub searchable_attributes: Option>>, #[serde(default)] - faceted_attributes: Option>, + pub faceted_attributes: Option>, #[serde( default, deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none", )] - criteria: Option>>, + pub criteria: Option>>, } @@ -45,7 +45,7 @@ pub struct Settings { #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] pub struct Facets { - level_group_size: Option, - min_level_size: Option, + pub level_group_size: Option, + pub min_level_size: Option, }