diff --git a/src/index_controller/actor_index_controller/index_actor.rs b/src/index_controller/actor_index_controller/index_actor.rs index 40f8de279..6123ca774 100644 --- a/src/index_controller/actor_index_controller/index_actor.rs +++ b/src/index_controller/actor_index_controller/index_actor.rs @@ -1,15 +1,20 @@ -use uuid::Uuid; +use std::fs::{File, create_dir_all}; use std::path::{PathBuf, Path}; -use chrono::Utc; -use tokio::sync::{mpsc, oneshot, RwLock}; -use thiserror::Error; -use std::collections::HashMap; use std::sync::Arc; -use milli::Index; -use std::collections::hash_map::Entry; -use std::fs::create_dir_all; + +use chrono::Utc; use heed::EnvOpenOptions; +use milli::Index; +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use thiserror::Error; +use tokio::sync::{mpsc, oneshot, RwLock}; +use uuid::Uuid; +use log::info; + +use super::update_handler::UpdateHandler; use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}, UpdateResult as UResult}; +use crate::option::IndexerOpts; pub type Result = std::result::Result; type AsyncMap = Arc>>; @@ -22,6 +27,7 @@ enum IndexMsg { struct IndexActor { inbox: mpsc::Receiver, + update_handler: Arc, store: S, } @@ -36,18 +42,22 @@ pub enum IndexError { #[async_trait::async_trait] trait IndexStore { async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn get_or_create(&self, uuid: Uuid) -> Result>; } -impl IndexActor { +impl IndexActor { fn new(inbox: mpsc::Receiver, store: S) -> Self { - Self { inbox, store } + let options = IndexerOpts::default(); + let update_handler = UpdateHandler::new(&options).unwrap(); + let update_handler = Arc::new(update_handler); + Self { inbox, store, update_handler } } async fn run(mut self) { loop { match self.inbox.recv().await { Some(IndexMsg::CreateIndex { uuid, primary_key, ret }) => self.handle_create_index(uuid, primary_key, ret).await, - Some(IndexMsg::Update { ret, meta, data }) => self.handle_update().await, + Some(IndexMsg::Update { ret, meta, data }) => self.handle_update(meta, data, ret).await, None => break, } } @@ -58,8 +68,14 @@ impl IndexActor { let _ = ret.send(result); } - async fn handle_update(&self) { - println!("processing update!!!"); + async fn handle_update(&self, meta: Processing, data: File, ret: oneshot::Sender) { + info!("processing update"); + let uuid = meta.index_uuid().clone(); + let index = self.store.get_or_create(uuid).await.unwrap(); + let update_handler = self.update_handler.clone(); + let result = tokio::task::spawn_blocking(move || update_handler.handle_update(meta, data, index.as_ref())).await; + let result = result.unwrap(); + let _ = ret.send(result); } } @@ -96,7 +112,7 @@ impl IndexActorHandle { struct MapIndexStore { root: PathBuf, meta_store: AsyncMap, - index_store: AsyncMap, + index_store: AsyncMap>, } #[async_trait::async_trait] @@ -126,10 +142,26 @@ impl IndexStore for MapIndexStore { Ok(index) }).await.expect("thread died"); - self.index_store.write().await.insert(meta.uuid.clone(), index?); + self.index_store.write().await.insert(meta.uuid.clone(), Arc::new(index?)); Ok(meta) } + + async fn get_or_create(&self, uuid: Uuid) -> Result> { + match self.index_store.write().await.entry(uuid.clone()) { + Entry::Vacant(entry) => { + match self.meta_store.write().await.entry(uuid.clone()) { + Entry::Vacant(_) => { + todo!() + } + Entry::Occupied(entry) => { + todo!() + } + } + } + Entry::Occupied(entry) => Ok(entry.get().clone()), + } + } } impl MapIndexStore { diff --git a/src/index_controller/actor_index_controller/mod.rs b/src/index_controller/actor_index_controller/mod.rs index 621bbb1d6..2936c59ea 100644 --- a/src/index_controller/actor_index_controller/mod.rs +++ b/src/index_controller/actor_index_controller/mod.rs @@ -2,6 +2,7 @@ mod index_actor; mod update_actor; mod uuid_resolver; mod update_store; +mod update_handler; use tokio::sync::oneshot; use super::IndexController; @@ -10,7 +11,6 @@ use super::IndexMetadata; use tokio::fs::File; use super::UpdateMeta; - pub struct ActorIndexController { uuid_resolver: uuid_resolver::UuidResolverHandle, index_handle: index_actor::IndexActorHandle, diff --git a/src/index_controller/actor_index_controller/update_actor.rs b/src/index_controller/actor_index_controller/update_actor.rs index a0bf011c5..d182ef1c8 100644 --- a/src/index_controller/actor_index_controller/update_actor.rs +++ b/src/index_controller/actor_index_controller/update_actor.rs @@ -53,11 +53,11 @@ impl UpdateActor { } } - async fn handle_update(&self, _uuid: Uuid, meta: UpdateMeta, payload: Option, ret: oneshot::Sender>) { + async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, payload: Option, ret: oneshot::Sender>) { let mut buf = Vec::new(); let mut payload = payload.unwrap(); payload.read_to_end(&mut buf).await.unwrap(); - let result = self.store.register_update(meta, &buf).unwrap(); + let result = self.store.register_update(meta, &buf, uuid).unwrap(); let _ = ret.send(Ok(UpdateStatus::Pending(result))); } } diff --git a/src/index_controller/local_index_controller/update_handler.rs b/src/index_controller/local_index_controller/update_handler.rs deleted file mode 100644 index 5781a2806..000000000 --- a/src/index_controller/local_index_controller/update_handler.rs +++ /dev/null @@ -1,255 +0,0 @@ -use std::collections::HashMap; -use std::io; -use std::sync::Arc; - -use anyhow::Result; -use flate2::read::GzDecoder; -use grenad::CompressionType; -use log::info; -use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; -use milli::Index; -use rayon::ThreadPool; - -use super::update_store::HandleUpdate; -use crate::index_controller::updates::{Failed, Processed, Processing}; -use crate::index_controller::{Facets, Settings, UpdateMeta, UpdateResult}; -use crate::option::IndexerOpts; - -pub struct UpdateHandler { - index: Arc, - max_nb_chunks: Option, - chunk_compression_level: Option, - thread_pool: Arc, - log_frequency: usize, - max_memory: usize, - linked_hash_map_size: usize, - chunk_compression_type: CompressionType, - chunk_fusing_shrink_size: u64, -} - -impl UpdateHandler { - pub fn new( - opt: &IndexerOpts, - index: Arc, - thread_pool: Arc, - ) -> anyhow::Result { - Ok(Self { - index, - max_nb_chunks: opt.max_nb_chunks, - chunk_compression_level: opt.chunk_compression_level, - thread_pool, - log_frequency: opt.log_every_n, - max_memory: opt.max_memory.get_bytes() as usize, - linked_hash_map_size: opt.linked_hash_map_size, - chunk_compression_type: opt.chunk_compression_type, - chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(), - }) - } - - fn update_buidler(&self, update_id: u64) -> UpdateBuilder { - // We prepare the update by using the update builder. - let mut update_builder = UpdateBuilder::new(update_id); - if let Some(max_nb_chunks) = self.max_nb_chunks { - update_builder.max_nb_chunks(max_nb_chunks); - } - if let Some(chunk_compression_level) = self.chunk_compression_level { - update_builder.chunk_compression_level(chunk_compression_level); - } - update_builder.thread_pool(&self.thread_pool); - update_builder.log_every_n(self.log_frequency); - update_builder.max_memory(self.max_memory); - update_builder.linked_hash_map_size(self.linked_hash_map_size); - update_builder.chunk_compression_type(self.chunk_compression_type); - update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size); - update_builder - } - - fn update_documents( - &self, - format: UpdateFormat, - method: IndexDocumentsMethod, - content: &[u8], - update_builder: UpdateBuilder, - primary_key: Option<&str>, - ) -> anyhow::Result { - // We must use the write transaction of the update here. - let mut wtxn = self.index.write_txn()?; - - // Set the primary key if not set already, ignore if already set. - match (self.index.primary_key(&wtxn)?, primary_key) { - (None, Some(ref primary_key)) => { - self.index.put_primary_key(&mut wtxn, primary_key)?; - } - _ => (), - } - - let mut builder = update_builder.index_documents(&mut wtxn, &self.index); - builder.update_format(format); - builder.index_documents_method(method); - - let gzipped = true; - let reader = if gzipped && !content.is_empty() { - Box::new(GzDecoder::new(content)) - } else { - Box::new(content) as Box - }; - - let result = builder.execute(reader, |indexing_step, update_id| { - info!("update {}: {:?}", update_id, indexing_step) - }); - - match result { - Ok(addition_result) => wtxn - .commit() - .and(Ok(UpdateResult::DocumentsAddition(addition_result))) - .map_err(Into::into), - Err(e) => Err(e.into()), - } - } - - fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result { - // We must use the write transaction of the update here. - let mut wtxn = self.index.write_txn()?; - let builder = update_builder.clear_documents(&mut wtxn, &self.index); - - match builder.execute() { - Ok(_count) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e.into()), - } - } - - fn update_settings( - &self, - settings: &Settings, - update_builder: UpdateBuilder, - ) -> anyhow::Result { - // We must use the write transaction of the update here. - let mut wtxn = self.index.write_txn()?; - let mut builder = update_builder.settings(&mut wtxn, &self.index); - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref names) = settings.searchable_attributes { - match names { - Some(names) => builder.set_searchable_fields(names.clone()), - None => builder.reset_searchable_fields(), - } - } - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref names) = settings.displayed_attributes { - match names { - Some(names) => builder.set_displayed_fields(names.clone()), - None => builder.reset_displayed_fields(), - } - } - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref facet_types) = settings.faceted_attributes { - let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new()); - builder.set_faceted_fields(facet_types); - } - - // We transpose the settings JSON struct into a real setting update. - if let Some(ref criteria) = settings.criteria { - match criteria { - Some(criteria) => builder.set_criteria(criteria.clone()), - None => builder.reset_criteria(), - } - } - - let result = builder - .execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); - - match result { - Ok(()) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e.into()), - } - } - - fn update_facets( - &self, - levels: &Facets, - update_builder: UpdateBuilder, - ) -> anyhow::Result { - // We must use the write transaction of the update here. - let mut wtxn = self.index.write_txn()?; - let mut builder = update_builder.facets(&mut wtxn, &self.index); - if let Some(value) = levels.level_group_size { - builder.level_group_size(value); - } - if let Some(value) = levels.min_level_size { - builder.min_level_size(value); - } - match builder.execute() { - Ok(()) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e.into()), - } - } - - fn delete_documents( - &self, - document_ids: &[u8], - update_builder: UpdateBuilder, - ) -> anyhow::Result { - let ids: Vec = serde_json::from_slice(document_ids)?; - let mut txn = self.index.write_txn()?; - let mut builder = update_builder.delete_documents(&mut txn, &self.index)?; - - // We ignore unexisting document ids - ids.iter().for_each(|id| { builder.delete_external_id(id); }); - - match builder.execute() { - Ok(deleted) => txn - .commit() - .and(Ok(UpdateResult::DocumentDeletion { deleted })) - .map_err(Into::into), - Err(e) => Err(e.into()) - } - } -} - -impl HandleUpdate for UpdateHandler { - fn handle_update( - &mut self, - meta: Processing, - content: &[u8], - ) -> Result, Failed> { - use UpdateMeta::*; - - let update_id = meta.id(); - - let update_builder = self.update_buidler(update_id); - - let result = match meta.meta() { - DocumentsAddition { - method, - format, - primary_key, - } => self.update_documents( - *format, - *method, - content, - update_builder, - primary_key.as_deref(), - ), - ClearDocuments => self.clear_documents(update_builder), - DeleteDocuments => self.delete_documents(content, update_builder), - Settings(settings) => self.update_settings(settings, update_builder), - Facets(levels) => self.update_facets(levels, update_builder), - }; - - match result { - Ok(result) => Ok(meta.process(result)), - Err(e) => Err(meta.fail(e.to_string())), - } - } -} diff --git a/src/index_controller/updates.rs b/src/index_controller/updates.rs index b15593b58..b2ad54a14 100644 --- a/src/index_controller/updates.rs +++ b/src/index_controller/updates.rs @@ -1,5 +1,6 @@ use chrono::{Utc, DateTime}; use serde::{Serialize, Deserialize}; +use uuid::Uuid; #[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] @@ -7,14 +8,16 @@ pub struct Pending { pub update_id: u64, pub meta: M, pub enqueued_at: DateTime, + pub index_uuid: Uuid, } impl Pending { - pub fn new(meta: M, update_id: u64) -> Self { + pub fn new(meta: M, update_id: u64, index_uuid: Uuid) -> Self { Self { enqueued_at: Utc::now(), meta, update_id, + index_uuid, } } @@ -73,6 +76,10 @@ impl Processing { self.from.meta() } + pub fn index_uuid(&self) -> &Uuid { + &self.from.index_uuid + } + pub fn process(self, meta: N) -> Processed { Processed { success: meta,