From 698a1ea5822c3ca154cf3bfdfb0a8da27660bbfb Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 7 Apr 2021 18:57:46 +0300 Subject: [PATCH] feat(http): store processing as RwLock> in index_actor --- .../src/index_controller/index_actor/actor.rs | 39 +++++++++++++------ meilisearch-http/src/index_controller/mod.rs | 8 +--- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index d56c53fcd..0620765d5 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -8,7 +8,7 @@ use futures::pin_mut; use futures::stream::StreamExt; use heed::CompactionOption; use log::debug; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, RwLock}; use tokio::task::spawn_blocking; use uuid::Uuid; @@ -25,6 +25,7 @@ pub struct IndexActor { read_receiver: Option>, write_receiver: Option>, update_handler: Arc, + processing: RwLock>, store: S, } @@ -42,8 +43,9 @@ impl IndexActor { Ok(Self { read_receiver, write_receiver, - store, update_handler, + processing: RwLock::new(Default::default()), + store, }) } @@ -181,16 +183,26 @@ impl IndexActor { meta: Processing, data: File, ) -> Result { - debug!("Processing update {}", meta.id()); - let uuid = meta.index_uuid(); - let update_handler = self.update_handler.clone(); - let index = match self.store.get(*uuid).await? { - Some(index) => index, - None => self.store.create(*uuid, None).await?, + let uuid = meta.index_uuid().clone(); + + *self.processing.write().await = Some(uuid); + + let result = { + debug!("Processing update {}", meta.id()); + let update_handler = self.update_handler.clone(); + let index = match self.store.get(uuid).await? { + Some(index) => index, + None => self.store.create(uuid, None).await?, + }; + + spawn_blocking(move || update_handler.handle_update(meta, data, index)) + .await + .map_err(|e| IndexError::Error(e.into())) }; - spawn_blocking(move || update_handler.handle_update(meta, data, index)) - .await - .map_err(|e| IndexError::Error(e.into())) + + *self.processing.write().await = None; + + result } async fn handle_settings(&self, uuid: Uuid) -> Result { @@ -342,13 +354,16 @@ impl IndexActor { .await? .ok_or(IndexError::UnexistingIndex)?; + let processing = self.processing.read().await; + let is_indexing = *processing == Some(uuid); + spawn_blocking(move || { let rtxn = index.read_txn()?; Ok(IndexStats { size: index.size()?, number_of_documents: index.number_of_documents(&rtxn)?, - is_indexing: false, // We set this field in src/index_controller/mod.rs get_stats + is_indexing, fields_distribution: index.fields_distribution(&rtxn)?, }) }) diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 967506a55..e459af10c 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -354,13 +354,7 @@ impl IndexController { pub async fn get_stats(&self, uid: String) -> anyhow::Result { let uuid = self.uuid_resolver.get(uid.clone()).await?; - let stats = self.index_handle.get_index_stats(uuid); - let is_indexing = self.update_handle.is_locked(uuid); - - Ok(IndexStats { - is_indexing: is_indexing.await?, - ..stats.await? - }) + Ok(self.index_handle.get_index_stats(uuid).await?) } }