diff --git a/src/data/mod.rs b/src/data/mod.rs index ecc8a7e2e..0b57f9a55 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -62,7 +62,7 @@ impl Data { let path = options.db_path.clone(); //let indexer_opts = options.indexer_options.clone(); create_dir_all(&path)?; - let index_controller = ActorIndexController::new(); + let index_controller = ActorIndexController::new(&path); let index_controller = Arc::new(index_controller); let mut api_keys = ApiKeys { diff --git a/src/data/updates.rs b/src/data/updates.rs index 7db04cd5d..01f5174a2 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -3,9 +3,6 @@ use milli::update::{IndexDocumentsMethod, UpdateFormat}; //use tokio::io::AsyncWriteExt; use actix_web::web::Payload; -use tokio::fs::File; -use tokio::io::{AsyncWriteExt, AsyncSeekExt}; -use futures::prelude::stream::StreamExt; use crate::index_controller::UpdateStatus; use crate::index_controller::{Settings, IndexMetadata}; @@ -17,18 +14,11 @@ impl Data { index: impl AsRef + Send + Sync + 'static, method: IndexDocumentsMethod, format: UpdateFormat, - mut stream: Payload, + stream: Payload, primary_key: Option, ) -> anyhow::Result { - let file = tempfile::tempfile_in(".")?; - let mut file = File::from_std(file); - while let Some(item) = stream.next().await { - file.write_all(&item?).await?; - } - file.seek(std::io::SeekFrom::Start(0)).await?; - file.flush().await?; - let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, file, primary_key).await?; + let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, stream, primary_key).await?; Ok(update_status) } diff --git a/src/index_controller/actor_index_controller/mod.rs b/src/index_controller/actor_index_controller/mod.rs index 646d9cf45..58e58a45a 100644 --- a/src/index_controller/actor_index_controller/mod.rs +++ b/src/index_controller/actor_index_controller/mod.rs @@ -4,25 +4,29 @@ mod uuid_resolver; mod update_store; mod update_handler; -use tokio::sync::oneshot; +use std::path::Path; + +use tokio::sync::{mpsc, oneshot}; use super::IndexController; use uuid::Uuid; use super::IndexMetadata; -use tokio::fs::File; +use futures::stream::StreamExt; +use actix_web::web::Payload; use super::UpdateMeta; use crate::data::{SearchResult, SearchQuery}; +use actix_web::web::Bytes; pub struct ActorIndexController { uuid_resolver: uuid_resolver::UuidResolverHandle, index_handle: index_actor::IndexActorHandle, - update_handle: update_actor::UpdateActorHandle, + update_handle: update_actor::UpdateActorHandle, } impl ActorIndexController { - pub fn new() -> Self { + pub fn new(path: impl AsRef) -> Self { let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); let index_actor = index_actor::IndexActorHandle::new(); - let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone()); + let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path); Self { uuid_resolver, index_handle: index_actor, update_handle } } } @@ -43,12 +47,22 @@ impl IndexController for ActorIndexController { index: String, method: milli::update::IndexDocumentsMethod, format: milli::update::UpdateFormat, - data: File, + mut payload: Payload, primary_key: Option, ) -> anyhow::Result { let uuid = self.uuid_resolver.get_or_create(index).await?; let meta = UpdateMeta::DocumentsAddition { method, format, primary_key }; - let status = self.update_handle.update(meta, Some(data), uuid).await?; + let (sender, receiver) = mpsc::channel(10); + + // It is necessary to spawn a local task to senf the payload to the update handle to + // prevent dead_locking between the update_handle::update that waits for the update to be + // registered and the update_actor that waits for the the payload to be sent to it. + tokio::task::spawn_local(async move { + while let Some(bytes) = payload.next().await { + sender.send(bytes.unwrap()).await; + } + }); + let status = self.update_handle.update(meta, receiver, uuid).await?; Ok(status) } diff --git a/src/index_controller/actor_index_controller/update_actor.rs b/src/index_controller/actor_index_controller/update_actor.rs index 3d783009f..6fc873715 100644 --- a/src/index_controller/actor_index_controller/update_actor.rs +++ b/src/index_controller/actor_index_controller/update_actor.rs @@ -1,22 +1,24 @@ -use super::index_actor::IndexActorHandle; -use uuid::Uuid; -use tokio::sync::{mpsc, oneshot}; -use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult}; -use thiserror::Error; -use tokio::io::AsyncReadExt; -use log::info; -use tokio::fs::File; -use std::path::PathBuf; use std::fs::create_dir_all; +use std::path::{Path, PathBuf}; use std::sync::Arc; +use log::info; +use super::index_actor::IndexActorHandle; +use thiserror::Error; +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; + +use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult, updates::Pending}; + pub type Result = std::result::Result; type UpdateStore = super::update_store::UpdateStore; #[derive(Debug, Error)] pub enum UpdateError {} -enum UpdateMsg { +enum UpdateMsg { CreateIndex{ uuid: Uuid, ret: oneshot::Sender>, @@ -24,20 +26,30 @@ enum UpdateMsg { Update { uuid: Uuid, meta: UpdateMeta, - payload: Option, + data: mpsc::Receiver, ret: oneshot::Sender> } } -struct UpdateActor { +struct UpdateActor { + path: PathBuf, store: Arc, - inbox: mpsc::Receiver, + inbox: mpsc::Receiver>, index_handle: IndexActorHandle, } -impl UpdateActor { - fn new(store: Arc, inbox: mpsc::Receiver, index_handle: IndexActorHandle) -> Self { - Self { store, inbox, index_handle } +impl UpdateActor +where D: AsRef<[u8]> + Sized + 'static, +{ + fn new( + store: Arc, + inbox: mpsc::Receiver>, + index_handle: IndexActorHandle, + path: impl AsRef, + ) -> Self { + let path = path.as_ref().to_owned().join("update_files"); + create_dir_all(&path).unwrap(); + Self { store, inbox, index_handle, path } } async fn run(mut self) { @@ -45,29 +57,43 @@ impl UpdateActor { loop { match self.inbox.recv().await { - Some(UpdateMsg::Update { uuid, meta, payload, ret }) => self.handle_update(uuid, meta, payload, ret).await, + Some(UpdateMsg::Update { uuid, meta, data, ret }) => self.handle_update(uuid, meta, data, ret).await, Some(_) => {} None => {} } } } - async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, payload: Option, ret: oneshot::Sender>) { - let mut buf = Vec::new(); - let mut payload = payload.unwrap(); - payload.read_to_end(&mut buf).await.unwrap(); - let result = self.store.register_update(meta, &buf, uuid).unwrap(); + async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, mut payload: mpsc::Receiver, ret: oneshot::Sender>) { + let store = self.store.clone(); + let update_file_id = uuid::Uuid::new_v4(); + let path = self.path.join(format!("update_{}", update_file_id)); + let mut file = File::create(&path).await.unwrap(); + + while let Some(bytes) = payload.recv().await { + file.write_all(bytes.as_ref()).await; + } + + file.flush().await; + + let file = file.into_std().await; + + let result = tokio::task::spawn_blocking(move || -> anyhow::Result> { + Ok(store.register_update(meta, path, uuid)?) + }).await.unwrap().unwrap(); let _ = ret.send(Ok(UpdateStatus::Pending(result))); } } #[derive(Clone)] -pub struct UpdateActorHandle { - sender: mpsc::Sender, +pub struct UpdateActorHandle { + sender: mpsc::Sender>, } -impl UpdateActorHandle { - pub fn new(index_handle: IndexActorHandle) -> Self { +impl UpdateActorHandle +where D: AsRef<[u8]> + Sized + 'static, +{ + pub fn new(index_handle: IndexActorHandle, path: impl AsRef) -> Self { let (sender, receiver) = mpsc::channel(100); let mut options = heed::EnvOpenOptions::new(); options.map_size(4096 * 100_000); @@ -79,16 +105,16 @@ impl UpdateActorHandle { let store = UpdateStore::open(options, &path, move |meta, file| { futures::executor::block_on(index_handle_clone.update(meta, file)) }).unwrap(); - let actor = UpdateActor::new(store, receiver, index_handle); + let actor = UpdateActor::new(store, receiver, index_handle, path); tokio::task::spawn_local(actor.run()); Self { sender } } - pub async fn update(&self, meta: UpdateMeta, payload: Option, uuid: Uuid) -> Result { + pub async fn update(&self, meta: UpdateMeta, data: mpsc::Receiver, uuid: Uuid) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Update { uuid, - payload, + data, meta, ret, }; diff --git a/src/index_controller/actor_index_controller/update_store.rs b/src/index_controller/actor_index_controller/update_store.rs index d14f47f05..ae4bfb8d8 100644 --- a/src/index_controller/actor_index_controller/update_store.rs +++ b/src/index_controller/actor_index_controller/update_store.rs @@ -1,9 +1,9 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; -use std::io::{Cursor, SeekFrom, Seek, Write}; +use std::fs::remove_file; use crossbeam_channel::Sender; -use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; +use heed::types::{OwnedType, DecodeIgnore, SerdeJson}; use heed::{EnvOpenOptions, Env, Database}; use serde::{Serialize, Deserialize}; use std::fs::File; @@ -17,7 +17,7 @@ type BEU64 = heed::zerocopy::U64; pub struct UpdateStore { env: Env, pending_meta: Database, SerdeJson>>, - pending: Database, ByteSlice>, + pending: Database, SerdeJson>, processed_meta: Database, SerdeJson>>, failed_meta: Database, SerdeJson>>, aborted_meta: Database, SerdeJson>>, @@ -140,7 +140,7 @@ where pub fn register_update( &self, meta: M, - content: &[u8], + content: impl AsRef, index_uuid: Uuid, ) -> heed::Result> { let mut wtxn = self.env.write_txn()?; @@ -154,7 +154,7 @@ where let meta = Pending::new(meta, update_id, index_uuid); self.pending_meta.put(&mut wtxn, &update_key, &meta)?; - self.pending.put(&mut wtxn, &update_key, content)?; + self.pending.put(&mut wtxn, &update_key, &content.as_ref().to_owned())?; wtxn.commit()?; @@ -178,7 +178,7 @@ where // a reader while processing it, not a writer. match first_meta { Some((first_id, pending)) => { - let first_content = self.pending + let content_path = self.pending .get(&rtxn, &first_id)? .expect("associated update content"); @@ -190,12 +190,7 @@ where .write() .unwrap() .replace(processing.clone()); - let mut cursor = Cursor::new(first_content); - let mut file = tempfile::tempfile()?; - let n = std::io::copy(&mut cursor, &mut file)?; - println!("copied count: {}", n); - file.flush()?; - file.seek(SeekFrom::Start(0))?; + let file = File::open(&content_path)?; // Process the pending update using the provided user function. let result = handler.handle_update(processing, file); drop(rtxn); @@ -209,6 +204,7 @@ where .unwrap() .take(); self.pending_meta.delete(&mut wtxn, &first_id)?; + remove_file(&content_path)?; self.pending.delete(&mut wtxn, &first_id)?; match result { Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index cee276ea0..c3a09b2aa 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -12,7 +12,7 @@ use milli::Index; use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; use uuid::Uuid; -use tokio::fs::File; +use actix_web::web::Payload; use crate::data::SearchResult; use crate::data::SearchQuery; @@ -133,7 +133,7 @@ pub trait IndexController { index: String, method: IndexDocumentsMethod, format: UpdateFormat, - data: File, + data: Payload, primary_key: Option, ) -> anyhow::Result;