diff --git a/meilisearch-http/src/index_controller/actor_index_controller/update_handler.rs b/meilisearch-http/src/index_controller/actor_index_controller/update_handler.rs deleted file mode 100644 index d9ac2f866..000000000 --- a/meilisearch-http/src/index_controller/actor_index_controller/update_handler.rs +++ /dev/null @@ -1,260 +0,0 @@ -use std::collections::HashMap; -use std::io; -use std::fs::File; - -use anyhow::Result; -use flate2::read::GzDecoder; -use grenad::CompressionType; -use log::info; -use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; -use milli::Index; -use rayon::ThreadPool; - -use crate::index_controller::updates::{Failed, Processed, Processing}; -use crate::index_controller::{Facets, Settings, UpdateMeta, UpdateResult}; -use crate::option::IndexerOpts; - -pub struct UpdateHandler { - max_nb_chunks: Option, - chunk_compression_level: Option, - thread_pool: ThreadPool, - log_frequency: usize, - max_memory: usize, - linked_hash_map_size: usize, - chunk_compression_type: CompressionType, - chunk_fusing_shrink_size: u64, -} - -impl UpdateHandler { - pub fn new( - opt: &IndexerOpts, - ) -> anyhow::Result { - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(opt.indexing_jobs.unwrap_or(0)) - .build()?; - Ok(Self { - max_nb_chunks: opt.max_nb_chunks, - chunk_compression_level: opt.chunk_compression_level, - thread_pool, - log_frequency: opt.log_every_n, - max_memory: opt.max_memory.get_bytes() as usize, - linked_hash_map_size: opt.linked_hash_map_size, - chunk_compression_type: opt.chunk_compression_type, - chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(), - }) - } - - fn update_buidler(&self, update_id: u64) -> UpdateBuilder { - // We prepare the update by using the update builder. - let mut update_builder = UpdateBuilder::new(update_id); - if let Some(max_nb_chunks) = self.max_nb_chunks { - update_builder.max_nb_chunks(max_nb_chunks); - } - if let Some(chunk_compression_level) = self.chunk_compression_level { - update_builder.chunk_compression_level(chunk_compression_level); - } - update_builder.thread_pool(&self.thread_pool); - update_builder.log_every_n(self.log_frequency); - update_builder.max_memory(self.max_memory); - update_builder.linked_hash_map_size(self.linked_hash_map_size); - update_builder.chunk_compression_type(self.chunk_compression_type); - update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size); - update_builder - } - - fn update_documents( - &self, - format: UpdateFormat, - method: IndexDocumentsMethod, - content: File, - update_builder: UpdateBuilder, - primary_key: Option<&str>, - index: &Index, - ) -> anyhow::Result { - info!("performing document addition"); - // We must use the write transaction of the update here. - let mut wtxn = index.write_txn()?; - - // Set the primary key if not set already, ignore if already set. - match (index.primary_key(&wtxn)?, primary_key) { - (None, Some(ref primary_key)) => { - index.put_primary_key(&mut wtxn, primary_key)?; - } - _ => (), - } - - let mut builder = update_builder.index_documents(&mut wtxn, index); - builder.update_format(format); - builder.index_documents_method(method); - - let gzipped = false; - let reader = if gzipped { - Box::new(GzDecoder::new(content)) - } else { - Box::new(content) as Box - }; - - let result = builder.execute(reader, |indexing_step, update_id| { - info!("update {}: {:?}", update_id, indexing_step) - }); - - info!("document addition done: {:?}", result); - - match result { - Ok(addition_result) => wtxn - .commit() - .and(Ok(UpdateResult::DocumentsAddition(addition_result))) - .map_err(Into::into), - Err(e) => Err(e.into()), - } - } - - fn clear_documents(&self, update_builder: UpdateBuilder, index: &Index) -> anyhow::Result { - // We must use the write transaction of the update here. - let mut wtxn = index.write_txn()?; - let builder = update_builder.clear_documents(&mut wtxn, index); - - match builder.execute() { - Ok(_count) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e.into()), - } - } - - fn update_settings( - &self, - settings: &Settings, - update_builder: UpdateBuilder, - index: &Index, - ) -> anyhow::Result { - // We must use the write transaction of the update here. - let mut wtxn = index.write_txn()?; - let mut builder = update_builder.settings(&mut wtxn, index); - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref names) = settings.searchable_attributes { - match names { - Some(names) => builder.set_searchable_fields(names.clone()), - None => builder.reset_searchable_fields(), - } - } - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref names) = settings.displayed_attributes { - match names { - Some(names) => builder.set_displayed_fields(names.clone()), - None => builder.reset_displayed_fields(), - } - } - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref facet_types) = settings.faceted_attributes { - let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new()); - builder.set_faceted_fields(facet_types); - } - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref criteria) = settings.criteria { - match criteria { - Some(criteria) => builder.set_criteria(criteria.clone()), - None => builder.reset_criteria(), - } - } - - let result = builder - .execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); - - match result { - Ok(()) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e.into()), - } - } - - fn update_facets( - &self, - levels: &Facets, - update_builder: UpdateBuilder, - index: &Index, - ) -> anyhow::Result { - // We must use the write transaction of the update here. - let mut wtxn = index.write_txn()?; - let mut builder = update_builder.facets(&mut wtxn, index); - if let Some(value) = levels.level_group_size { - builder.level_group_size(value); - } - if let Some(value) = levels.min_level_size { - builder.min_level_size(value); - } - match builder.execute() { - Ok(()) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e.into()), - } - } - - fn delete_documents( - &self, - document_ids: File, - update_builder: UpdateBuilder, - index: &Index, - ) -> anyhow::Result { - let ids: Vec = serde_json::from_reader(document_ids)?; - let mut txn = index.write_txn()?; - let mut builder = update_builder.delete_documents(&mut txn, index)?; - - // We ignore unexisting document ids - ids.iter().for_each(|id| { builder.delete_external_id(id); }); - - match builder.execute() { - Ok(deleted) => txn - .commit() - .and(Ok(UpdateResult::DocumentDeletion { deleted })) - .map_err(Into::into), - Err(e) => Err(e.into()) - } - } - - pub fn handle_update( - &self, - meta: Processing, - content: File, - index: &Index, - ) -> Result, Failed> { - use UpdateMeta::*; - - let update_id = meta.id(); - - let update_builder = self.update_buidler(update_id); - - let result = match meta.meta() { - DocumentsAddition { - method, - format, - primary_key, - } => self.update_documents( - *format, - *method, - content, - update_builder, - primary_key.as_deref(), - index, - ), - ClearDocuments => self.clear_documents(update_builder, index), - DeleteDocuments => self.delete_documents(content, update_builder, index), - Settings(settings) => self.update_settings(settings, update_builder, index), - Facets(levels) => self.update_facets(levels, update_builder, index), - }; - - match result { - Ok(result) => Ok(meta.process(result)), - Err(e) => Err(meta.fail(e.to_string())), - } - } -} diff --git a/meilisearch-http/src/index_controller/actor_index_controller/update_store.rs b/meilisearch-http/src/index_controller/actor_index_controller/update_store.rs deleted file mode 100644 index 371ac7bd9..000000000 --- a/meilisearch-http/src/index_controller/actor_index_controller/update_store.rs +++ /dev/null @@ -1,423 +0,0 @@ -use std::path::Path; -use std::sync::{Arc, RwLock}; -use std::io::{Cursor, SeekFrom, Seek}; - -use crossbeam_channel::Sender; -use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; -use heed::{EnvOpenOptions, Env, Database}; -use serde::{Serialize, Deserialize}; -use std::fs::File; -use uuid::Uuid; - -use crate::index_controller::updates::*; - -type BEU64 = heed::zerocopy::U64; - -#[derive(Clone)] -pub struct UpdateStore { - env: Env, - pending_meta: Database, SerdeJson>>, - pending: Database, ByteSlice>, - processed_meta: Database, SerdeJson>>, - failed_meta: Database, SerdeJson>>, - aborted_meta: Database, SerdeJson>>, - processing: Arc>>>, - notification_sender: Sender<()>, -} - -pub trait HandleUpdate { - fn handle_update(&mut self, meta: Processing, content: File) -> Result, Failed>; -} - -impl HandleUpdate for F -where F: FnMut(Processing, File) -> Result, Failed> -{ - fn handle_update(&mut self, meta: Processing, content: File) -> Result, Failed> { - self(meta, content) - } -} - -impl UpdateStore -where - M: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync + Clone, - N: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, - E: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync, -{ - pub fn open( - mut options: EnvOpenOptions, - path: P, - mut update_handler: U, - ) -> heed::Result> - where - P: AsRef, - U: HandleUpdate + Send + 'static, - { - options.max_dbs(5); - - let env = options.open(path)?; - let pending_meta = env.create_database(Some("pending-meta"))?; - let pending = env.create_database(Some("pending"))?; - let processed_meta = env.create_database(Some("processed-meta"))?; - let aborted_meta = env.create_database(Some("aborted-meta"))?; - let failed_meta = env.create_database(Some("failed-meta"))?; - let processing = Arc::new(RwLock::new(None)); - - let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); - // Send a first notification to trigger the process. - let _ = notification_sender.send(()); - - let update_store = Arc::new(UpdateStore { - env, - pending, - pending_meta, - processed_meta, - aborted_meta, - notification_sender, - failed_meta, - processing, - }); - - // We need a weak reference so we can take ownership on the arc later when we - // want to close the index. - let update_store_weak = Arc::downgrade(&update_store); - std::thread::spawn(move || { - // Block and wait for something to process. - 'outer: for _ in notification_receiver { - loop { - match update_store_weak.upgrade() { - Some(update_store) => { - match update_store.process_pending_update(&mut update_handler) { - Ok(Some(_)) => (), - Ok(None) => break, - Err(e) => eprintln!("error while processing update: {}", e), - } - } - // the ownership on the arc has been taken, we need to exit. - None => break 'outer, - } - } - } - }); - - Ok(update_store) - } - - pub fn prepare_for_closing(self) -> heed::EnvClosingEvent { - self.env.prepare_for_closing() - } - - /// Returns the new biggest id to use to store the new update. - fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { - let last_pending = self.pending_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_processed = self.processed_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_aborted = self.aborted_meta - .remap_data_type::() - .last(txn)? - .map(|(k, _)| k.get()); - - let last_update_id = [last_pending, last_processed, last_aborted] - .iter() - .copied() - .flatten() - .max(); - - match last_update_id { - Some(last_id) => Ok(last_id + 1), - None => Ok(0), - } - } - - /// Registers the update content in the pending store and the meta - /// into the pending-meta store. Returns the new unique update id. - pub fn register_update( - &self, - meta: M, - content: &[u8], - index_uuid: Uuid, - ) -> heed::Result> { - let mut wtxn = self.env.write_txn()?; - - // We ask the update store to give us a new update id, this is safe, - // no other update can have the same id because we use a write txn before - // asking for the id and registering it so other update registering - // will be forced to wait for a new write txn. - let update_id = self.new_update_id(&wtxn)?; - let update_key = BEU64::new(update_id); - - let meta = Pending::new(meta, update_id, index_uuid); - self.pending_meta.put(&mut wtxn, &update_key, &meta)?; - self.pending.put(&mut wtxn, &update_key, content)?; - - wtxn.commit()?; - - if let Err(e) = self.notification_sender.try_send(()) { - assert!(!e.is_disconnected(), "update notification channel is disconnected"); - } - Ok(meta) - } - /// Executes the user provided function on the next pending update (the one with the lowest id). - /// This is asynchronous as it let the user process the update with a read-only txn and - /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, handler: &mut U) -> heed::Result> - where - U: HandleUpdate + Send + 'static, - { - // Create a read transaction to be able to retrieve the pending update in order. - let rtxn = self.env.read_txn()?; - let first_meta = self.pending_meta.first(&rtxn)?; - - // If there is a pending update we process and only keep - // a reader while processing it, not a writer. - match first_meta { - Some((first_id, pending)) => { - let first_content = self.pending - .get(&rtxn, &first_id)? - .expect("associated update content"); - - // we change the state of the update from pending to processing before we pass it - // to the update handler. Processing store is non persistent to be able recover - // from a failure - let processing = pending.processing(); - self.processing - .write() - .unwrap() - .replace(processing.clone()); - let mut cursor = Cursor::new(first_content); - let mut file = tempfile::tempfile()?; - std::io::copy(&mut cursor, &mut file)?; - file.seek(SeekFrom::Start(0))?; - // Process the pending update using the provided user function. - let result = handler.handle_update(processing, file); - drop(rtxn); - - // Once the pending update have been successfully processed - // we must remove the content from the pending and processing stores and - // write the *new* meta to the processed-meta store and commit. - let mut wtxn = self.env.write_txn()?; - self.processing - .write() - .unwrap() - .take(); - self.pending_meta.delete(&mut wtxn, &first_id)?; - self.pending.delete(&mut wtxn, &first_id)?; - match result { - Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, - Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?, - } - wtxn.commit()?; - - Ok(Some(())) - }, - None => Ok(None) - } - } - - /// Execute the user defined function with the meta-store iterators, the first - /// iterator is the *processed* meta one, the second the *aborted* meta one - /// and, the last is the *pending* meta one. - pub fn iter_metas(&self, mut f: F) -> heed::Result - where - F: for<'a> FnMut( - Option>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - heed::RoIter<'a, OwnedType, SerdeJson>>, - ) -> heed::Result, - { - let rtxn = self.env.read_txn()?; - - // We get the pending, processed and aborted meta iterators. - let processed_iter = self.processed_meta.iter(&rtxn)?; - let aborted_iter = self.aborted_meta.iter(&rtxn)?; - let pending_iter = self.pending_meta.iter(&rtxn)?; - let processing = self.processing.read().unwrap().clone(); - let failed_iter = self.failed_meta.iter(&rtxn)?; - - // We execute the user defined function with both iterators. - (f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter) - } - - /// Returns the update associated meta or `None` if the update doesn't exist. - pub fn meta(&self, update_id: u64) -> heed::Result>> { - let rtxn = self.env.read_txn()?; - let key = BEU64::new(update_id); - - if let Some(ref meta) = *self.processing.read().unwrap() { - if meta.id() == update_id { - return Ok(Some(UpdateStatus::Processing(meta.clone()))); - } - } - - if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Pending(meta))); - } - - if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Processed(meta))); - } - - if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Aborted(meta))); - } - - if let Some(meta) = self.failed_meta.get(&rtxn, &key)? { - return Ok(Some(UpdateStatus::Failed(meta))); - } - - Ok(None) - } - - /// Aborts an update, an aborted update content is deleted and - /// the meta of it is moved into the aborted updates database. - /// - /// Trying to abort an update that is currently being processed, an update - /// that as already been processed or which doesn't actually exist, will - /// return `None`. - #[allow(dead_code)] - pub fn abort_update(&self, update_id: u64) -> heed::Result>> { - let mut wtxn = self.env.write_txn()?; - let key = BEU64::new(update_id); - - // We cannot abort an update that is currently being processed. - if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) { - return Ok(None); - } - - let pending = match self.pending_meta.get(&wtxn, &key)? { - Some(meta) => meta, - None => return Ok(None), - }; - - let aborted = pending.abort(); - - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; - - wtxn.commit()?; - - Ok(Some(aborted)) - } - - /// Aborts all the pending updates, and not the one being currently processed. - /// Returns the update metas and ids that were successfully aborted. - #[allow(dead_code)] - pub fn abort_pendings(&self) -> heed::Result)>> { - let mut wtxn = self.env.write_txn()?; - let mut aborted_updates = Vec::new(); - - // We skip the first pending update as it is currently being processed. - for result in self.pending_meta.iter(&wtxn)?.skip(1) { - let (key, pending) = result?; - let id = key.get(); - aborted_updates.push((id, pending.abort())); - } - - for (id, aborted) in &aborted_updates { - let key = BEU64::new(*id); - self.aborted_meta.put(&mut wtxn, &key, &aborted)?; - self.pending_meta.delete(&mut wtxn, &key)?; - self.pending.delete(&mut wtxn, &key)?; - } - - wtxn.commit()?; - - Ok(aborted_updates) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::thread; - use std::time::{Duration, Instant}; - - impl HandleUpdate for F - where F: FnMut(Processing, &[u8]) -> Result, Failed> + Send + 'static { - fn handle_update(&mut self, meta: Processing, content: &[u8]) -> Result, Failed> { - self(meta, content) - } - } - - #[test] - fn simple() { - let dir = tempfile::tempdir().unwrap(); - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir, |meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { - let new_meta = meta.meta().to_string() + " processed"; - let processed = meta.process(new_meta); - Ok(processed) - }).unwrap(); - - let meta = String::from("kiki"); - let update = update_store.register_update(meta, &[]).unwrap(); - thread::sleep(Duration::from_millis(100)); - let meta = update_store.meta(update.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "kiki processed"); - } else { - panic!() - } - } - - #[test] - #[ignore] - fn long_running_update() { - let dir = tempfile::tempdir().unwrap(); - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir, |meta: Processing, _content:&_| -> Result<_, Failed<_, ()>> { - thread::sleep(Duration::from_millis(400)); - let new_meta = meta.meta().to_string() + "processed"; - let processed = meta.process(new_meta); - Ok(processed) - }).unwrap(); - - let before_register = Instant::now(); - - let meta = String::from("kiki"); - let update_kiki = update_store.register_update(meta, &[]).unwrap(); - assert!(before_register.elapsed() < Duration::from_millis(200)); - - let meta = String::from("coco"); - let update_coco = update_store.register_update(meta, &[]).unwrap(); - assert!(before_register.elapsed() < Duration::from_millis(200)); - - let meta = String::from("cucu"); - let update_cucu = update_store.register_update(meta, &[]).unwrap(); - assert!(before_register.elapsed() < Duration::from_millis(200)); - - thread::sleep(Duration::from_millis(400 * 3 + 100)); - - let meta = update_store.meta(update_kiki.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "kiki processed"); - } else { - panic!() - } - - let meta = update_store.meta(update_coco.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "coco processed"); - } else { - panic!() - } - - let meta = update_store.meta(update_cucu.id()).unwrap().unwrap(); - if let UpdateStatus::Processed(Processed { success, .. }) = meta { - assert_eq!(success, "cucu processed"); - } else { - panic!() - } - } -}