use std::collections::HashMap; use std::io; use std::fs::File; use anyhow::Result; use flate2::read::GzDecoder; use grenad::CompressionType; use log::info; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use milli::Index; use rayon::ThreadPool; use crate::index_controller::updates::{Failed, Processed, Processing}; use crate::index_controller::{Facets, Settings, UpdateMeta, UpdateResult}; use crate::option::IndexerOpts; pub struct UpdateHandler { max_nb_chunks: Option, chunk_compression_level: Option, thread_pool: ThreadPool, log_frequency: usize, max_memory: usize, linked_hash_map_size: usize, chunk_compression_type: CompressionType, chunk_fusing_shrink_size: u64, } impl UpdateHandler { pub fn new( opt: &IndexerOpts, ) -> anyhow::Result { let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(opt.indexing_jobs.unwrap_or(0)) .build()?; Ok(Self { max_nb_chunks: opt.max_nb_chunks, chunk_compression_level: opt.chunk_compression_level, thread_pool, log_frequency: opt.log_every_n, max_memory: opt.max_memory.get_bytes() as usize, linked_hash_map_size: opt.linked_hash_map_size, chunk_compression_type: opt.chunk_compression_type, chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(), }) } fn update_buidler(&self, update_id: u64) -> UpdateBuilder { // We prepare the update by using the update builder. let mut update_builder = UpdateBuilder::new(update_id); if let Some(max_nb_chunks) = self.max_nb_chunks { update_builder.max_nb_chunks(max_nb_chunks); } if let Some(chunk_compression_level) = self.chunk_compression_level { update_builder.chunk_compression_level(chunk_compression_level); } update_builder.thread_pool(&self.thread_pool); update_builder.log_every_n(self.log_frequency); update_builder.max_memory(self.max_memory); update_builder.linked_hash_map_size(self.linked_hash_map_size); update_builder.chunk_compression_type(self.chunk_compression_type); update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size); update_builder } fn update_documents( &self, format: UpdateFormat, method: IndexDocumentsMethod, content: File, update_builder: UpdateBuilder, primary_key: Option<&str>, index: &Index, ) -> anyhow::Result { info!("performing document addition"); // We must use the write transaction of the update here. let mut wtxn = index.write_txn()?; // Set the primary key if not set already, ignore if already set. match (index.primary_key(&wtxn)?, primary_key) { (None, Some(ref primary_key)) => { index.put_primary_key(&mut wtxn, primary_key)?; } _ => (), } let mut builder = update_builder.index_documents(&mut wtxn, index); builder.update_format(format); builder.index_documents_method(method); let gzipped = false; let reader = if gzipped { Box::new(GzDecoder::new(content)) } else { Box::new(content) as Box }; let result = builder.execute(reader, |indexing_step, update_id| { info!("update {}: {:?}", update_id, indexing_step) }); info!("document addition done: {:?}", result); match result { Ok(addition_result) => wtxn .commit() .and(Ok(UpdateResult::DocumentsAddition(addition_result))) .map_err(Into::into), Err(e) => Err(e.into()), } } fn clear_documents(&self, update_builder: UpdateBuilder, index: &Index) -> anyhow::Result { // We must use the write transaction of the update here. let mut wtxn = index.write_txn()?; let builder = update_builder.clear_documents(&mut wtxn, index); match builder.execute() { Ok(_count) => wtxn .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), Err(e) => Err(e.into()), } } fn update_settings( &self, settings: &Settings, update_builder: UpdateBuilder, index: &Index, ) -> anyhow::Result { // We must use the write transaction of the update here. let mut wtxn = index.write_txn()?; let mut builder = update_builder.settings(&mut wtxn, index); // We transpose the settings JSON struct into a real setting update. if let Some(ref names) = settings.searchable_attributes { match names { Some(names) => builder.set_searchable_fields(names.clone()), None => builder.reset_searchable_fields(), } } // We transpose the settings JSON struct into a real setting update. if let Some(ref names) = settings.displayed_attributes { match names { Some(names) => builder.set_displayed_fields(names.clone()), None => builder.reset_displayed_fields(), } } // We transpose the settings JSON struct into a real setting update. if let Some(ref facet_types) = settings.faceted_attributes { let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new()); builder.set_faceted_fields(facet_types); } // We transpose the settings JSON struct into a real setting update. if let Some(ref criteria) = settings.criteria { match criteria { Some(criteria) => builder.set_criteria(criteria.clone()), None => builder.reset_criteria(), } } let result = builder .execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); match result { Ok(()) => wtxn .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), Err(e) => Err(e.into()), } } fn update_facets( &self, levels: &Facets, update_builder: UpdateBuilder, index: &Index, ) -> anyhow::Result { // We must use the write transaction of the update here. let mut wtxn = index.write_txn()?; let mut builder = update_builder.facets(&mut wtxn, index); if let Some(value) = levels.level_group_size { builder.level_group_size(value); } if let Some(value) = levels.min_level_size { builder.min_level_size(value); } match builder.execute() { Ok(()) => wtxn .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), Err(e) => Err(e.into()), } } fn delete_documents( &self, document_ids: File, update_builder: UpdateBuilder, index: &Index, ) -> anyhow::Result { let ids: Vec = serde_json::from_reader(document_ids)?; let mut txn = index.write_txn()?; let mut builder = update_builder.delete_documents(&mut txn, index)?; // We ignore unexisting document ids ids.iter().for_each(|id| { builder.delete_external_id(id); }); match builder.execute() { Ok(deleted) => txn .commit() .and(Ok(UpdateResult::DocumentDeletion { deleted })) .map_err(Into::into), Err(e) => Err(e.into()) } } pub fn handle_update( &self, meta: Processing, content: File, index: &Index, ) -> Result, Failed> { use UpdateMeta::*; let update_id = meta.id(); let update_builder = self.update_buidler(update_id); let result = match meta.meta() { DocumentsAddition { method, format, primary_key, } => self.update_documents( *format, *method, content, update_builder, primary_key.as_deref(), index, ), ClearDocuments => self.clear_documents(update_builder, index), DeleteDocuments => self.delete_documents(content, update_builder, index), Settings(settings) => self.update_settings(settings, update_builder, index), Facets(levels) => self.update_facets(levels, update_builder, index), }; match result { Ok(result) => Ok(meta.process(result)), Err(e) => Err(meta.fail(e.to_string())), } } }