From 54861335a0f56c46e1c20cac7df85d617501d668 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 29 Dec 2020 11:11:06 +0100 Subject: [PATCH] retrieve update status --- Cargo.lock | 52 ++++++- src/data/mod.rs | 94 +++++++++++++ src/{data.rs => data/search.rs} | 240 ++++++++------------------------ src/data/updates.rs | 53 +++++++ src/routes/index.rs | 24 +++- src/updates/mod.rs | 112 +++++---------- 6 files changed, 308 insertions(+), 267 deletions(-) create mode 100644 src/data/mod.rs rename src/{data.rs => data/search.rs} (56%) create mode 100644 src/data/updates.rs diff --git a/Cargo.lock b/Cargo.lock index c5cea3936..13f74b82d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1188,6 +1188,22 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heed" +version = "0.10.6" +dependencies = [ + "byteorder", + "heed-traits 0.7.0", + "heed-types 0.7.2", + "libc", + "lmdb-rkv-sys", + "once_cell", + "page_size", + "synchronoise", + "url", + "zerocopy", +] + [[package]] name = "heed" version = "0.10.6" @@ -1195,8 +1211,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afcc6c911acaadad3ebe9f1ef1707d80bd71c92037566f47b6238a03b60adf1a" dependencies = [ "byteorder", - "heed-traits", - "heed-types", + "heed-traits 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "heed-types 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "libc", "lmdb-rkv-sys", "once_cell", @@ -1207,12 +1223,27 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "heed-traits" +version = "0.7.0" + [[package]] name = "heed-traits" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b328f6260a7e51bdb0ca6b68e6ea27ee3d11fba5dee930896ee7ff6ad5fc072c" +[[package]] +name = "heed-types" +version = "0.7.2" +dependencies = [ + "bincode", + "heed-traits 0.7.0", + "serde", + "serde_json", + "zerocopy", +] + [[package]] name = "heed-types" version = "0.7.2" @@ -1220,7 +1251,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e628efb08beaee58355f80dc4adba79d644940ea9eef60175ea17dc218aab405" dependencies = [ "bincode", - "heed-traits", + "heed-traits 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde", "serde_json", "zerocopy", @@ -1615,7 +1646,7 @@ dependencies = [ "futures", "futures-util", "grenad", - "heed", + "heed 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)", "http", "indexmap", "jemallocator", @@ -1699,6 +1730,7 @@ dependencies = [ "bstr", "byte-unit", "byteorder", + "chrono", "crossbeam-channel", "csv", "either", @@ -1706,7 +1738,7 @@ dependencies = [ "fst", "fxhash", "grenad", - "heed", + "heed 0.10.6", "human_format", "itertools", "jemallocator", @@ -1728,6 +1760,7 @@ dependencies = [ "roaring", "serde", "serde_json", + "serde_millis", "slice-group-by", "smallstr", "smallvec", @@ -2594,6 +2627,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_millis" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e2dc780ca5ee2c369d1d01d100270203c4ff923d2a4264812d723766434d00" +dependencies = [ + "serde", +] + [[package]] name = "serde_qs" version = "0.8.2" diff --git a/src/data/mod.rs b/src/data/mod.rs new file mode 100644 index 000000000..a699c9d20 --- /dev/null +++ b/src/data/mod.rs @@ -0,0 +1,94 @@ +mod search; +mod updates; + +pub use search::{SearchQuery, SearchResult}; + +use std::fs::create_dir_all; +use std::ops::Deref; +use std::sync::Arc; + +use milli::Index; +use sha2::Digest; + +use crate::option::Opt; +use crate::updates::UpdateQueue; + +#[derive(Clone)] +pub struct Data { + inner: Arc, +} + +impl Deref for Data { + type Target = DataInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[derive(Clone)] +pub struct DataInner { + pub indexes: Arc, + pub update_queue: Arc, + api_keys: ApiKeys, + options: Opt, +} + +#[derive(Clone)] +pub struct ApiKeys { + pub public: Option, + pub private: Option, + pub master: Option, +} + +impl ApiKeys { + pub fn generate_missing_api_keys(&mut self) { + if let Some(master_key) = &self.master { + if self.private.is_none() { + let key = format!("{}-private", master_key); + let sha = sha2::Sha256::digest(key.as_bytes()); + self.private = Some(format!("{:x}", sha)); + } + if self.public.is_none() { + let key = format!("{}-public", master_key); + let sha = sha2::Sha256::digest(key.as_bytes()); + self.public = Some(format!("{:x}", sha)); + } + } + } +} + +impl Data { + pub fn new(options: Opt) -> anyhow::Result { + let db_size = options.max_mdb_size.get_bytes() as usize; + let path = options.db_path.join("main"); + create_dir_all(&path)?; + let indexes = Index::new(&path, Some(db_size))?; + let indexes = Arc::new(indexes); + + let update_queue = Arc::new(UpdateQueue::new(&options, indexes.clone())?); + + let mut api_keys = ApiKeys { + master: options.clone().master_key, + private: None, + public: None, + }; + + api_keys.generate_missing_api_keys(); + + let inner = DataInner { indexes, options, update_queue, api_keys }; + let inner = Arc::new(inner); + + Ok(Data { inner }) + } + + #[inline] + pub fn http_payload_size_limit(&self) -> usize { + self.options.http_payload_size_limit.get_bytes() as usize + } + + #[inline] + pub fn api_keys(&self) -> &ApiKeys { + &self.api_keys + } +} diff --git a/src/data.rs b/src/data/search.rs similarity index 56% rename from src/data.rs rename to src/data/search.rs index 48826a44f..bd22a959b 100644 --- a/src/data.rs +++ b/src/data/search.rs @@ -1,28 +1,20 @@ use std::borrow::Cow; use std::collections::HashSet; -use std::fs::create_dir_all; use std::mem; -use std::ops::Deref; -use std::sync::Arc; use std::time::Instant; -use async_compression::tokio_02::write::GzipEncoder; -use futures_util::stream::StreamExt; -use tokio::io::AsyncWriteExt; -use milli::{Index, SearchResult as Results, obkv_to_json}; -use milli::update::{IndexDocumentsMethod, UpdateFormat}; -use sha2::Digest; use serde_json::{Value, Map}; use serde::{Deserialize, Serialize}; +use milli::{SearchResult as Results, obkv_to_json}; use meilisearch_tokenizer::{Analyzer, AnalyzerConfig}; -use crate::option::Opt; -use crate::updates::{UpdateQueue, UpdateMeta, UpdateStatus, UpdateMetaProgress}; +use super::Data; const DEFAULT_SEARCH_LIMIT: usize = 20; #[derive(Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] +#[allow(dead_code)] pub struct SearchQuery { q: Option, offset: Option, @@ -48,176 +40,6 @@ pub struct SearchResult { processing_time_ms: u128, } -#[derive(Clone)] -pub struct Data { - inner: Arc, -} - -impl Deref for Data { - type Target = DataInner; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -#[derive(Clone)] -pub struct DataInner { - pub indexes: Arc, - pub update_queue: Arc, - api_keys: ApiKeys, - options: Opt, -} - -#[derive(Clone)] -pub struct ApiKeys { - pub public: Option, - pub private: Option, - pub master: Option, -} - -impl ApiKeys { - pub fn generate_missing_api_keys(&mut self) { - if let Some(master_key) = &self.master { - if self.private.is_none() { - let key = format!("{}-private", master_key); - let sha = sha2::Sha256::digest(key.as_bytes()); - self.private = Some(format!("{:x}", sha)); - } - if self.public.is_none() { - let key = format!("{}-public", master_key); - let sha = sha2::Sha256::digest(key.as_bytes()); - self.public = Some(format!("{:x}", sha)); - } - } - } -} - -impl Data { - pub fn new(options: Opt) -> anyhow::Result { - let db_size = options.max_mdb_size.get_bytes() as usize; - let path = options.db_path.join("main"); - create_dir_all(&path)?; - let indexes = Index::new(&path, Some(db_size))?; - let indexes = Arc::new(indexes); - - let update_queue = Arc::new(UpdateQueue::new(&options, indexes.clone())?); - - let mut api_keys = ApiKeys { - master: options.clone().master_key, - private: None, - public: None, - }; - - api_keys.generate_missing_api_keys(); - - let inner = DataInner { indexes, options, update_queue, api_keys }; - let inner = Arc::new(inner); - - Ok(Data { inner }) - } - - pub async fn add_documents( - &self, - _index: S, - method: IndexDocumentsMethod, - format: UpdateFormat, - mut stream: impl futures::Stream> + Unpin, - ) -> anyhow::Result> - where - B: Deref, - E: std::error::Error + Send + Sync + 'static, - S: AsRef, - { - let file = tokio::task::spawn_blocking(tempfile::tempfile).await?; - let file = tokio::fs::File::from_std(file?); - let mut encoder = GzipEncoder::new(file); - - while let Some(result) = stream.next().await { - let bytes = &*result?; - encoder.write_all(&bytes[..]).await?; - } - - encoder.shutdown().await?; - let mut file = encoder.into_inner(); - file.sync_all().await?; - let file = file.into_std().await; - let mmap = unsafe { memmap::Mmap::map(&file)? }; - - let meta = UpdateMeta::DocumentsAddition { method, format }; - - let queue = self.update_queue.clone(); - let meta_cloned = meta.clone(); - let update_id = tokio::task::spawn_blocking(move || queue.register_update(&meta_cloned, &mmap[..])).await??; - - Ok(UpdateStatus::Pending { update_id, meta }) - } - - pub fn search>(&self, _index: S, search_query: SearchQuery) -> anyhow::Result { - let start = Instant::now(); - let index = &self.indexes; - let rtxn = index.read_txn()?; - - let mut search = index.search(&rtxn); - if let Some(query) = &search_query.q { - search.query(query); - } - - if let Some(offset) = search_query.offset { - search.offset(offset); - } - - let limit = search_query.limit.unwrap_or(DEFAULT_SEARCH_LIMIT); - search.limit(limit); - - let Results { found_words, documents_ids, nb_hits, .. } = search.execute().unwrap(); - - let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - - let displayed_fields = match index.displayed_fields(&rtxn).unwrap() { - Some(fields) => Cow::Borrowed(fields), - None => Cow::Owned(fields_ids_map.iter().map(|(id, _)| id).collect()), - }; - - let attributes_to_highlight = match search_query.attributes_to_highlight { - Some(fields) => fields.iter().map(ToOwned::to_owned).collect(), - None => HashSet::new(), - }; - - let stop_words = fst::Set::default(); - let highlighter = Highlighter::new(&stop_words); - let mut documents = Vec::new(); - for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() { - let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap(); - highlighter.highlight_record(&mut object, &found_words, &attributes_to_highlight); - documents.push(object); - } - - let processing_time_ms = start.elapsed().as_millis(); - - let result = SearchResult { - hits: documents, - nb_hits, - query: search_query.q.unwrap_or_default(), - offset: search_query.offset.unwrap_or(0), - limit, - processing_time_ms, - }; - - Ok(result) - } - - #[inline] - pub fn http_payload_size_limit(&self) -> usize { - self.options.http_payload_size_limit.get_bytes() as usize - } - - #[inline] - pub fn api_keys(&self) -> &ApiKeys { - &self.api_keys - } -} - struct Highlighter<'a, A> { analyzer: Analyzer<'a, A>, } @@ -276,3 +98,59 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> { } } } + +impl Data { + pub fn search>(&self, _index: S, search_query: SearchQuery) -> anyhow::Result { + let start = Instant::now(); + let index = &self.indexes; + let rtxn = index.read_txn()?; + + let mut search = index.search(&rtxn); + if let Some(query) = &search_query.q { + search.query(query); + } + + if let Some(offset) = search_query.offset { + search.offset(offset); + } + + let limit = search_query.limit.unwrap_or(DEFAULT_SEARCH_LIMIT); + search.limit(limit); + + let Results { found_words, documents_ids, nb_hits, .. } = search.execute().unwrap(); + + let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + + let displayed_fields = match index.displayed_fields(&rtxn).unwrap() { + Some(fields) => Cow::Borrowed(fields), + None => Cow::Owned(fields_ids_map.iter().map(|(id, _)| id).collect()), + }; + + let attributes_to_highlight = match search_query.attributes_to_highlight { + Some(fields) => fields.iter().map(ToOwned::to_owned).collect(), + None => HashSet::new(), + }; + + let stop_words = fst::Set::default(); + let highlighter = Highlighter::new(&stop_words); + let mut documents = Vec::new(); + for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() { + let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap(); + highlighter.highlight_record(&mut object, &found_words, &attributes_to_highlight); + documents.push(object); + } + + let processing_time_ms = start.elapsed().as_millis(); + + let result = SearchResult { + hits: documents, + nb_hits, + query: search_query.q.unwrap_or_default(), + offset: search_query.offset.unwrap_or(0), + limit, + processing_time_ms, + }; + + Ok(result) + } +} diff --git a/src/data/updates.rs b/src/data/updates.rs new file mode 100644 index 000000000..d12f271c8 --- /dev/null +++ b/src/data/updates.rs @@ -0,0 +1,53 @@ +use std::ops::Deref; + +use async_compression::tokio_02::write::GzipEncoder; +use futures_util::stream::StreamExt; +use tokio::io::AsyncWriteExt; +use milli::update::{IndexDocumentsMethod, UpdateFormat}; +use milli::update_store::UpdateStatus; + +use super::Data; +use crate::updates::UpdateMeta; + +impl Data { + pub async fn add_documents( + &self, + _index: S, + method: IndexDocumentsMethod, + format: UpdateFormat, + mut stream: impl futures::Stream> + Unpin, + ) -> anyhow::Result> + where + B: Deref, + E: std::error::Error + Send + Sync + 'static, + S: AsRef, + { + let file = tokio::task::spawn_blocking(tempfile::tempfile).await?; + let file = tokio::fs::File::from_std(file?); + let mut encoder = GzipEncoder::new(file); + + while let Some(result) = stream.next().await { + let bytes = &*result?; + encoder.write_all(&bytes[..]).await?; + } + + encoder.shutdown().await?; + let mut file = encoder.into_inner(); + file.sync_all().await?; + let file = file.into_std().await; + let mmap = unsafe { memmap::Mmap::map(&file)? }; + + let meta = UpdateMeta::DocumentsAddition { method, format }; + + let queue = self.update_queue.clone(); + let update = tokio::task::spawn_blocking(move || queue.register_update(meta, &mmap[..])).await??; + + Ok(update.into()) + } + + + #[inline] + pub fn get_update_status(&self, _index: &str, uid: u64) -> anyhow::Result>> { + self.update_queue.get_update_status(uid) + } +} diff --git a/src/routes/index.rs b/src/routes/index.rs index 36860f34f..eb961a8dd 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -1,6 +1,7 @@ use actix_web::{delete, get, post, put}; use actix_web::{web, HttpResponse}; use chrono::{DateTime, Utc}; +use log::error; use serde::{Deserialize, Serialize}; use crate::Data; @@ -93,8 +94,8 @@ async fn delete_index( #[derive(Deserialize)] struct UpdateParam { - _index_uid: String, - _update_id: u64, + index_uid: String, + update_id: u64, } #[get( @@ -102,10 +103,23 @@ struct UpdateParam { wrap = "Authentication::Private" )] async fn get_update_status( - _data: web::Data, - _path: web::Path, + data: web::Data, + path: web::Path, ) -> Result { - todo!() + let result = data.get_update_status(&path.index_uid, path.update_id); + match result { + Ok(Some(meta)) => { + let json = serde_json::to_string(&meta).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Ok(None) => { + todo!() + } + Err(e) => { + error!("{}", e); + todo!() + } + } } #[get("/indexes/{index_uid}/updates", wrap = "Authentication::Private")] diff --git a/src/updates/mod.rs b/src/updates/mod.rs index f08439c74..cd8052108 100644 --- a/src/updates/mod.rs +++ b/src/updates/mod.rs @@ -8,14 +8,15 @@ use std::ops::Deref; use std::fs::create_dir_all; use anyhow::Result; +use byte_unit::Byte; use flate2::read::GzDecoder; use grenad::CompressionType; -use byte_unit::Byte; -use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod, UpdateIndexingStep::*}; -use milli::{UpdateStore, UpdateHandler as Handler, Index}; +use log::info; +use milli::Index; +use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod }; +use milli::update_store::{UpdateStore, UpdateHandler as Handler, UpdateStatus, Processing, Processed, Failed}; use rayon::ThreadPool; use serde::{Serialize, Deserialize}; -use tokio::sync::broadcast; use structopt::StructOpt; use crate::option::Opt; @@ -40,23 +41,13 @@ pub enum UpdateMetaProgress { }, } -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type")] -#[allow(dead_code)] -pub enum UpdateStatus { - Pending { update_id: u64, meta: M }, - Progressing { update_id: u64, meta: P }, - Processed { update_id: u64, meta: N }, - Aborted { update_id: u64, meta: M }, -} - #[derive(Clone)] pub struct UpdateQueue { - inner: Arc>, + inner: Arc>, } impl Deref for UpdateQueue { - type Target = Arc>; + type Target = Arc>; fn deref(&self) -> &Self::Target { &self.inner @@ -115,8 +106,6 @@ pub struct IndexerOpts { pub indexing_jobs: Option, } -type UpdateSender = broadcast::Sender>; - struct UpdateHandler { indexes: Arc, max_nb_chunks: Option, @@ -127,14 +116,12 @@ struct UpdateHandler { linked_hash_map_size: usize, chunk_compression_type: CompressionType, chunk_fusing_shrink_size: u64, - update_status_sender: UpdateSender, } impl UpdateHandler { fn new( opt: &IndexerOpts, indexes: Arc, - update_status_sender: UpdateSender, ) -> Result { let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(opt.indexing_jobs.unwrap_or(0)) @@ -149,7 +136,6 @@ impl UpdateHandler { linked_hash_map_size: opt.linked_hash_map_size, chunk_compression_type: opt.chunk_compression_type, chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(), - update_status_sender, }) } @@ -191,23 +177,7 @@ impl UpdateHandler { Box::new(content) as Box }; - let result = builder.execute(reader, |indexing_step, update_id| { - let (current, total) = match indexing_step { - TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), - ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), - IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), - MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)), - }; - let _ = self.update_status_sender.send(UpdateStatus::Progressing { - update_id, - meta: UpdateMetaProgress::DocumentsAddition { - step: indexing_step.step(), - total_steps: indexing_step.number_of_steps(), - current, - total, - } - }); - }); + let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); match result { Ok(()) => wtxn.commit().map_err(Into::into), @@ -226,57 +196,41 @@ impl UpdateHandler { } } - fn update_settings(&self, settings: Settings, update_builder: UpdateBuilder) -> Result<()> { + fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> Result<()> { // We must use the write transaction of the update here. let mut wtxn = self.indexes.write_txn()?; let mut builder = update_builder.settings(&mut wtxn, &self.indexes); // We transpose the settings JSON struct into a real setting update. - if let Some(names) = settings.searchable_attributes { + if let Some(ref names) = settings.searchable_attributes { match names { - Some(names) => builder.set_searchable_fields(names), + Some(names) => builder.set_searchable_fields(&names), None => builder.reset_searchable_fields(), } } // We transpose the settings JSON struct into a real setting update. - if let Some(names) = settings.displayed_attributes { + if let Some(ref names) = settings.displayed_attributes { match names { - Some(names) => builder.set_displayed_fields(names), + Some(names) => builder.set_displayed_fields(&names), None => builder.reset_displayed_fields(), } } // We transpose the settings JSON struct into a real setting update. - if let Some(facet_types) = settings.faceted_attributes { - builder.set_faceted_fields(facet_types); + if let Some(ref facet_types) = settings.faceted_attributes { + builder.set_faceted_fields(&facet_types); } // We transpose the settings JSON struct into a real setting update. - if let Some(criteria) = settings.criteria { + if let Some(ref criteria) = settings.criteria { match criteria { - Some(criteria) => builder.set_criteria(criteria), + Some(criteria) => builder.set_criteria(&criteria), None => builder.reset_criteria(), } } - let result = builder.execute(|indexing_step, update_id| { - let (current, total) = match indexing_step { - TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), - ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), - IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), - MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)), - }; - let _ = self.update_status_sender.send(UpdateStatus::Progressing { - update_id, - meta: UpdateMetaProgress::DocumentsAddition { - step: indexing_step.step(), - total_steps: indexing_step.number_of_steps(), - current, - total, - } - }); - }); + let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); match result { Ok(_count) => wtxn.commit().map_err(Into::into), @@ -284,7 +238,7 @@ impl UpdateHandler { } } - fn update_facets(&self, levels: Facets, update_builder: UpdateBuilder) -> Result<()> { + fn update_facets(&self, levels: &Facets, update_builder: UpdateBuilder) -> Result<()> { // We must use the write transaction of the update here. let mut wtxn = self.indexes.write_txn()?; let mut builder = update_builder.facets(&mut wtxn, &self.indexes); @@ -301,28 +255,30 @@ impl UpdateHandler { } } -impl Handler for UpdateHandler { - fn handle_update(&mut self, update_id: u64, meta: UpdateMeta, content: &[u8]) -> heed::Result { +impl Handler for UpdateHandler { + fn handle_update( + &mut self, + update_id: u64, + meta: Processing, + content: &[u8] + ) -> Result, Failed> { use UpdateMeta::*; let update_builder = self.update_buidler(update_id); - let result: anyhow::Result<()> = match meta { - DocumentsAddition { method, format } => { - self.update_documents(format, method, content, update_builder) - }, + let result: anyhow::Result<()> = match meta.meta() { + DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), ClearDocuments => self.clear_documents(update_builder), Settings(settings) => self.update_settings(settings, update_builder), Facets(levels) => self.update_facets(levels, update_builder), }; - let meta = match result { + let new_meta = match result { Ok(()) => format!("valid update content"), Err(e) => format!("error while processing update content: {:?}", e), }; - let processed = UpdateStatus::Processed { update_id, meta: meta.clone() }; - let _ = self.update_status_sender.send(processed); + let meta = meta.process(new_meta); Ok(meta) } @@ -333,8 +289,7 @@ impl UpdateQueue { opt: &Opt, indexes: Arc, ) -> Result { - let (sender, _) = broadcast::channel(100); - let handler = UpdateHandler::new(&opt.indexer_options, indexes, sender)?; + let handler = UpdateHandler::new(&opt.indexer_options, indexes)?; let size = opt.max_udb_size.get_bytes() as usize; let path = opt.db_path.join("updates.mdb"); create_dir_all(&path)?; @@ -345,4 +300,9 @@ impl UpdateQueue { )?; Ok(Self { inner }) } + + #[inline] + pub fn get_update_status(&self, update_id: u64) -> Result>> { + Ok(self.inner.meta(update_id)?) + } }