mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
refactor update actor
This commit is contained in:
parent
def737edee
commit
12542bf922
25 changed files with 253 additions and 297 deletions
61
meilisearch-lib/src/index_controller/updates/error.rs
Normal file
61
meilisearch-lib/src/index_controller/updates/error.rs
Normal file
|
@ -0,0 +1,61 @@
|
|||
use std::error::Error;
|
||||
|
||||
use meilisearch_error::{Code, ErrorCode};
|
||||
|
||||
use crate::index_controller::index_actor::error::IndexActorError;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, UpdateActorError>;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum UpdateActorError {
|
||||
#[error("Update {0} not found.")]
|
||||
UnexistingUpdate(u64),
|
||||
#[error("Internal error: {0}")]
|
||||
Internal(Box<dyn Error + Send + Sync + 'static>),
|
||||
#[error("{0}")]
|
||||
IndexActor(#[from] IndexActorError),
|
||||
#[error(
|
||||
"update store was shut down due to a fatal error, please check your logs for more info."
|
||||
)]
|
||||
FatalUpdateStoreError,
|
||||
#[error("{0}")]
|
||||
InvalidPayload(Box<dyn Error + Send + Sync + 'static>),
|
||||
#[error("{0}")]
|
||||
PayloadError(#[from] actix_web::error::PayloadError),
|
||||
}
|
||||
|
||||
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateActorError {
|
||||
fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Self {
|
||||
Self::FatalUpdateStoreError
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::sync::oneshot::error::RecvError> for UpdateActorError {
|
||||
fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
|
||||
Self::FatalUpdateStoreError
|
||||
}
|
||||
}
|
||||
|
||||
internal_error!(
|
||||
UpdateActorError: heed::Error,
|
||||
std::io::Error,
|
||||
serde_json::Error,
|
||||
tokio::task::JoinError
|
||||
);
|
||||
|
||||
impl ErrorCode for UpdateActorError {
|
||||
fn error_code(&self) -> Code {
|
||||
match self {
|
||||
UpdateActorError::UnexistingUpdate(_) => Code::NotFound,
|
||||
UpdateActorError::Internal(_) => Code::Internal,
|
||||
UpdateActorError::IndexActor(e) => e.error_code(),
|
||||
UpdateActorError::FatalUpdateStoreError => Code::Internal,
|
||||
UpdateActorError::InvalidPayload(_) => Code::BadRequest,
|
||||
UpdateActorError::PayloadError(error) => match error {
|
||||
actix_web::error::PayloadError::Overflow => Code::PayloadTooLarge,
|
||||
_ => Code::Internal,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
112
meilisearch-lib/src/index_controller/updates/message.rs
Normal file
112
meilisearch-lib/src/index_controller/updates/message.rs
Normal file
|
@ -0,0 +1,112 @@
|
|||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::error::Result;
|
||||
use super::{Update, UpdateStatus, UpdateStoreInfo};
|
||||
|
||||
pub enum UpdateMsg {
|
||||
Update {
|
||||
uuid: Uuid,
|
||||
update: Update,
|
||||
ret: oneshot::Sender<Result<UpdateStatus>>,
|
||||
},
|
||||
ListUpdates {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<Vec<UpdateStatus>>>,
|
||||
},
|
||||
GetUpdate {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<UpdateStatus>>,
|
||||
id: u64,
|
||||
},
|
||||
Delete {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<()>>,
|
||||
},
|
||||
Snapshot {
|
||||
uuids: HashSet<Uuid>,
|
||||
path: PathBuf,
|
||||
ret: oneshot::Sender<Result<()>>,
|
||||
},
|
||||
Dump {
|
||||
uuids: HashSet<Uuid>,
|
||||
path: PathBuf,
|
||||
ret: oneshot::Sender<Result<()>>,
|
||||
},
|
||||
GetInfo {
|
||||
ret: oneshot::Sender<Result<UpdateStoreInfo>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl UpdateMsg {
|
||||
pub async fn dump(
|
||||
sender: &mpsc::Sender<Self>,
|
||||
uuids: HashSet<Uuid>,
|
||||
path: PathBuf,
|
||||
) -> Result<()> {
|
||||
let (ret, rcv) = oneshot::channel();
|
||||
let msg = Self::Dump {
|
||||
path,
|
||||
uuids,
|
||||
ret,
|
||||
};
|
||||
sender.send(msg).await?;
|
||||
rcv.await?
|
||||
}
|
||||
pub async fn update(
|
||||
sender: &mpsc::Sender<Self>,
|
||||
uuid: Uuid,
|
||||
update: Update,
|
||||
) -> Result<UpdateStatus> {
|
||||
let (ret, rcv) = oneshot::channel();
|
||||
let msg = Self::Update {
|
||||
uuid,
|
||||
update,
|
||||
ret,
|
||||
};
|
||||
sender.send(msg).await?;
|
||||
rcv.await?
|
||||
}
|
||||
|
||||
pub async fn get_update(
|
||||
sender: &mpsc::Sender<Self>,
|
||||
uuid: Uuid,
|
||||
id: u64,
|
||||
) -> Result<UpdateStatus> {
|
||||
let (ret, rcv) = oneshot::channel();
|
||||
let msg = Self::GetUpdate {
|
||||
uuid,
|
||||
id,
|
||||
ret,
|
||||
};
|
||||
sender.send(msg).await?;
|
||||
rcv.await?
|
||||
}
|
||||
|
||||
pub async fn list_updates(
|
||||
sender: &mpsc::Sender<Self>,
|
||||
uuid: Uuid,
|
||||
) -> Result<Vec<UpdateStatus>> {
|
||||
let (ret, rcv) = oneshot::channel();
|
||||
let msg = Self::ListUpdates {
|
||||
uuid,
|
||||
ret,
|
||||
};
|
||||
sender.send(msg).await?;
|
||||
rcv.await?
|
||||
}
|
||||
|
||||
pub async fn get_info(
|
||||
sender: &mpsc::Sender<Self>,
|
||||
) -> Result<UpdateStoreInfo> {
|
||||
let (ret, rcv) = oneshot::channel();
|
||||
let msg = Self::GetInfo {
|
||||
ret,
|
||||
};
|
||||
sender.send(msg).await?;
|
||||
rcv.await?
|
||||
}
|
||||
}
|
299
meilisearch-lib/src/index_controller/updates/mod.rs
Normal file
299
meilisearch-lib/src/index_controller/updates/mod.rs
Normal file
|
@ -0,0 +1,299 @@
|
|||
pub mod error;
|
||||
mod message;
|
||||
pub mod status;
|
||||
pub mod store;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use actix_web::error::PayloadError;
|
||||
use async_stream::stream;
|
||||
use bytes::Bytes;
|
||||
use futures::{Stream, StreamExt};
|
||||
use log::trace;
|
||||
use milli::documents::DocumentBatchBuilder;
|
||||
use milli::update::IndexDocumentsMethod;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Map, Value};
|
||||
use tokio::sync::mpsc;
|
||||
use uuid::Uuid;
|
||||
|
||||
use self::error::{Result, UpdateActorError};
|
||||
pub use self::message::UpdateMsg;
|
||||
use self::store::{UpdateStore, UpdateStoreInfo};
|
||||
use crate::index_controller::update_file_store::UpdateFileStore;
|
||||
use status::UpdateStatus;
|
||||
|
||||
use super::{DocumentAdditionFormat, Payload, Update};
|
||||
|
||||
pub type UpdateSender = mpsc::Sender<UpdateMsg>;
|
||||
type IndexSender = mpsc::Sender<()>;
|
||||
|
||||
pub fn create_update_handler(
|
||||
index_sender: IndexSender,
|
||||
db_path: impl AsRef<Path>,
|
||||
update_store_size: usize,
|
||||
) -> anyhow::Result<UpdateSender> {
|
||||
let path = db_path.as_ref().to_owned();
|
||||
let (sender, receiver) = mpsc::channel(100);
|
||||
let actor = UpdateHandler::new(update_store_size, receiver, path, index_sender)?;
|
||||
|
||||
tokio::task::spawn_local(actor.run());
|
||||
|
||||
Ok(sender)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum RegisterUpdate {
|
||||
DocumentAddition {
|
||||
primary_key: Option<String>,
|
||||
method: IndexDocumentsMethod,
|
||||
content_uuid: Uuid,
|
||||
},
|
||||
}
|
||||
|
||||
/// A wrapper type to implement read on a `Stream<Result<Bytes, Error>>`.
|
||||
struct StreamReader<S> {
|
||||
stream: S,
|
||||
current: Option<Bytes>,
|
||||
}
|
||||
|
||||
impl<S> StreamReader<S> {
|
||||
fn new(stream: S) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
current: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Stream<Item = std::result::Result<Bytes, PayloadError>> + Unpin> io::Read
|
||||
for StreamReader<S>
|
||||
{
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
// TODO: optimize buf filling
|
||||
match self.current.take() {
|
||||
Some(mut bytes) => {
|
||||
let copied = bytes.split_to(buf.len());
|
||||
buf.copy_from_slice(&copied);
|
||||
if !bytes.is_empty() {
|
||||
self.current.replace(bytes);
|
||||
}
|
||||
Ok(copied.len())
|
||||
}
|
||||
None => match tokio::runtime::Handle::current().block_on(self.stream.next()) {
|
||||
Some(Ok(bytes)) => {
|
||||
self.current.replace(bytes);
|
||||
self.read(buf)
|
||||
}
|
||||
Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)),
|
||||
None => return Ok(0),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UpdateHandler {
|
||||
store: Arc<UpdateStore>,
|
||||
inbox: Option<mpsc::Receiver<UpdateMsg>>,
|
||||
update_file_store: UpdateFileStore,
|
||||
index_handle: IndexSender,
|
||||
must_exit: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl UpdateHandler {
|
||||
pub fn new(
|
||||
update_db_size: usize,
|
||||
inbox: mpsc::Receiver<UpdateMsg>,
|
||||
path: impl AsRef<Path>,
|
||||
index_handle: IndexSender,
|
||||
) -> anyhow::Result<Self> {
|
||||
let path = path.as_ref().to_owned();
|
||||
std::fs::create_dir_all(&path)?;
|
||||
|
||||
let mut options = heed::EnvOpenOptions::new();
|
||||
options.map_size(update_db_size);
|
||||
|
||||
let must_exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?;
|
||||
|
||||
let inbox = Some(inbox);
|
||||
|
||||
let update_file_store = UpdateFileStore::new(&path).unwrap();
|
||||
|
||||
Ok(Self {
|
||||
store,
|
||||
inbox,
|
||||
index_handle,
|
||||
must_exit,
|
||||
update_file_store,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
use UpdateMsg::*;
|
||||
|
||||
trace!("Started update actor.");
|
||||
|
||||
let mut inbox = self
|
||||
.inbox
|
||||
.take()
|
||||
.expect("A receiver should be present by now.");
|
||||
|
||||
let must_exit = self.must_exit.clone();
|
||||
let stream = stream! {
|
||||
loop {
|
||||
let msg = inbox.recv().await;
|
||||
|
||||
if must_exit.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
match msg {
|
||||
Some(msg) => yield msg,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
stream
|
||||
.for_each_concurrent(Some(10), |msg| async {
|
||||
match msg {
|
||||
Update { uuid, update, ret } => {
|
||||
let _ = ret.send(self.handle_update(uuid, update).await);
|
||||
}
|
||||
ListUpdates { uuid, ret } => {
|
||||
let _ = ret.send(self.handle_list_updates(uuid).await);
|
||||
}
|
||||
GetUpdate { uuid, ret, id } => {
|
||||
let _ = ret.send(self.handle_get_update(uuid, id).await);
|
||||
}
|
||||
Delete { uuid, ret } => {
|
||||
let _ = ret.send(self.handle_delete(uuid).await);
|
||||
}
|
||||
Snapshot { uuids, path, ret } => {
|
||||
let _ = ret.send(self.handle_snapshot(uuids, path).await);
|
||||
}
|
||||
GetInfo { ret } => {
|
||||
let _ = ret.send(self.handle_get_info().await);
|
||||
}
|
||||
Dump { uuids, path, ret } => {
|
||||
let _ = ret.send(self.handle_dump(uuids, path).await);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_update(&self, index_uuid: Uuid, update: Update) -> Result<UpdateStatus> {
|
||||
let registration = match update {
|
||||
Update::DocumentAddition {
|
||||
payload,
|
||||
primary_key,
|
||||
method,
|
||||
format,
|
||||
} => {
|
||||
let content_uuid = match format {
|
||||
DocumentAdditionFormat::Json => self.documents_from_json(payload).await?,
|
||||
};
|
||||
|
||||
RegisterUpdate::DocumentAddition {
|
||||
primary_key,
|
||||
method,
|
||||
content_uuid,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let store = self.store.clone();
|
||||
let status =
|
||||
tokio::task::spawn_blocking(move || store.register_update(index_uuid, registration))
|
||||
.await??;
|
||||
|
||||
Ok(status.into())
|
||||
}
|
||||
|
||||
async fn documents_from_json(&self, payload: Payload) -> Result<Uuid> {
|
||||
let file_store = self.update_file_store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let (uuid, mut file) = file_store.new_update().unwrap();
|
||||
let mut builder = DocumentBatchBuilder::new(&mut *file).unwrap();
|
||||
|
||||
let documents: Vec<Map<String, Value>> =
|
||||
serde_json::from_reader(StreamReader::new(payload))?;
|
||||
builder.add_documents(documents).unwrap();
|
||||
builder.finish().unwrap();
|
||||
|
||||
file.persist();
|
||||
|
||||
Ok(uuid)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
|
||||
let update_store = self.store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let result = update_store.list(uuid)?;
|
||||
Ok(result)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
|
||||
let store = self.store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let result = store
|
||||
.meta(uuid, id)?
|
||||
.ok_or(UpdateActorError::UnexistingUpdate(id))?;
|
||||
Ok(result)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
|
||||
let store = self.store.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
|
||||
let index_handle = self.index_handle.clone();
|
||||
let update_store = self.store.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
|
||||
let index_handle = self.index_handle.clone();
|
||||
let update_store = self.store.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || -> Result<()> {
|
||||
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_get_info(&self) -> Result<UpdateStoreInfo> {
|
||||
let update_store = self.store.clone();
|
||||
let info = tokio::task::spawn_blocking(move || -> Result<UpdateStoreInfo> {
|
||||
let info = update_store.get_info()?;
|
||||
Ok(info)
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(info)
|
||||
}
|
||||
}
|
248
meilisearch-lib/src/index_controller/updates/status.rs
Normal file
248
meilisearch-lib/src/index_controller/updates/status.rs
Normal file
|
@ -0,0 +1,248 @@
|
|||
use std::{error::Error, fmt::Display};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
use meilisearch_error::{Code, ErrorCode};
|
||||
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{RegisterUpdate, index::{Settings, Unchecked}};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum UpdateResult {
|
||||
DocumentsAddition(DocumentAdditionResult),
|
||||
DocumentDeletion { deleted: u64 },
|
||||
Other,
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum UpdateMeta {
|
||||
DocumentsAddition {
|
||||
method: IndexDocumentsMethod,
|
||||
primary_key: Option<String>,
|
||||
},
|
||||
ClearDocuments,
|
||||
DeleteDocuments {
|
||||
ids: Vec<String>,
|
||||
},
|
||||
Settings(Settings<Unchecked>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Enqueued {
|
||||
pub update_id: u64,
|
||||
pub meta: RegisterUpdate,
|
||||
pub enqueued_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Enqueued {
|
||||
pub fn new(meta: RegisterUpdate, update_id: u64) -> Self {
|
||||
Self {
|
||||
enqueued_at: Utc::now(),
|
||||
meta,
|
||||
update_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn processing(self) -> Processing {
|
||||
Processing {
|
||||
from: self,
|
||||
started_processing_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn abort(self) -> Aborted {
|
||||
Aborted {
|
||||
from: self,
|
||||
aborted_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &RegisterUpdate {
|
||||
&self.meta
|
||||
}
|
||||
|
||||
pub fn id(&self) -> u64 {
|
||||
self.update_id
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Processed {
|
||||
pub success: UpdateResult,
|
||||
pub processed_at: DateTime<Utc>,
|
||||
#[serde(flatten)]
|
||||
pub from: Processing,
|
||||
}
|
||||
|
||||
impl Processed {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &RegisterUpdate {
|
||||
self.from.meta()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Processing {
|
||||
#[serde(flatten)]
|
||||
pub from: Enqueued,
|
||||
pub started_processing_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Processing {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &RegisterUpdate {
|
||||
self.from.meta()
|
||||
}
|
||||
|
||||
pub fn process(self, success: UpdateResult) -> Processed {
|
||||
Processed {
|
||||
success,
|
||||
from: self,
|
||||
processed_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fail(self, error: impl ErrorCode) -> Failed {
|
||||
let msg = error.to_string();
|
||||
let code = error.error_code();
|
||||
Failed {
|
||||
from: self,
|
||||
msg,
|
||||
code,
|
||||
failed_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Aborted {
|
||||
#[serde(flatten)]
|
||||
from: Enqueued,
|
||||
aborted_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Aborted {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &RegisterUpdate {
|
||||
self.from.meta()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Failed {
|
||||
#[serde(flatten)]
|
||||
pub from: Processing,
|
||||
pub msg: String,
|
||||
pub code: Code,
|
||||
pub failed_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Display for Failed {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.msg.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for Failed { }
|
||||
|
||||
impl ErrorCode for Failed {
|
||||
fn error_code(&self) -> Code {
|
||||
self.code
|
||||
}
|
||||
}
|
||||
|
||||
impl Failed {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &RegisterUpdate {
|
||||
self.from.meta()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "status", rename_all = "camelCase")]
|
||||
pub enum UpdateStatus {
|
||||
Processing(Processing),
|
||||
Enqueued(Enqueued),
|
||||
Processed(Processed),
|
||||
Aborted(Aborted),
|
||||
Failed(Failed),
|
||||
}
|
||||
|
||||
impl UpdateStatus {
|
||||
pub fn id(&self) -> u64 {
|
||||
match self {
|
||||
UpdateStatus::Processing(u) => u.id(),
|
||||
UpdateStatus::Enqueued(u) => u.id(),
|
||||
UpdateStatus::Processed(u) => u.id(),
|
||||
UpdateStatus::Aborted(u) => u.id(),
|
||||
UpdateStatus::Failed(u) => u.id(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &RegisterUpdate {
|
||||
match self {
|
||||
UpdateStatus::Processing(u) => u.meta(),
|
||||
UpdateStatus::Enqueued(u) => u.meta(),
|
||||
UpdateStatus::Processed(u) => u.meta(),
|
||||
UpdateStatus::Aborted(u) => u.meta(),
|
||||
UpdateStatus::Failed(u) => u.meta(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn processed(&self) -> Option<&Processed> {
|
||||
match self {
|
||||
UpdateStatus::Processed(p) => Some(p),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Enqueued> for UpdateStatus {
|
||||
fn from(other: Enqueued) -> Self {
|
||||
Self::Enqueued(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Aborted> for UpdateStatus {
|
||||
fn from(other: Aborted) -> Self {
|
||||
Self::Aborted(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Processed> for UpdateStatus {
|
||||
fn from(other: Processed) -> Self {
|
||||
Self::Processed(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Processing> for UpdateStatus {
|
||||
fn from(other: Processing) -> Self {
|
||||
Self::Processing(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Failed> for UpdateStatus {
|
||||
fn from(other: Failed) -> Self {
|
||||
Self::Failed(other)
|
||||
}
|
||||
}
|
86
meilisearch-lib/src/index_controller/updates/store/codec.rs
Normal file
86
meilisearch-lib/src/index_controller/updates/store/codec.rs
Normal file
|
@ -0,0 +1,86 @@
|
|||
use std::{borrow::Cow, convert::TryInto, mem::size_of};
|
||||
|
||||
use heed::{BytesDecode, BytesEncode};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct NextIdCodec;
|
||||
|
||||
pub enum NextIdKey {
|
||||
Global,
|
||||
Index(Uuid),
|
||||
}
|
||||
|
||||
impl<'a> BytesEncode<'a> for NextIdCodec {
|
||||
type EItem = NextIdKey;
|
||||
|
||||
fn bytes_encode(item: &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
|
||||
match item {
|
||||
NextIdKey::Global => Some(Cow::Borrowed(b"__global__")),
|
||||
NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PendingKeyCodec;
|
||||
|
||||
impl<'a> BytesEncode<'a> for PendingKeyCodec {
|
||||
type EItem = (u64, Uuid, u64);
|
||||
|
||||
fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
|
||||
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
|
||||
bytes.extend_from_slice(&global_id.to_be_bytes());
|
||||
bytes.extend_from_slice(uuid.as_bytes());
|
||||
bytes.extend_from_slice(&update_id.to_be_bytes());
|
||||
Some(Cow::Owned(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> BytesDecode<'a> for PendingKeyCodec {
|
||||
type DItem = (u64, Uuid, u64);
|
||||
|
||||
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
|
||||
let global_id_bytes = bytes.get(0..size_of::<u64>())?.try_into().ok()?;
|
||||
let global_id = u64::from_be_bytes(global_id_bytes);
|
||||
|
||||
let uuid_bytes = bytes
|
||||
.get(size_of::<u64>()..(size_of::<u64>() + size_of::<Uuid>()))?
|
||||
.try_into()
|
||||
.ok()?;
|
||||
let uuid = Uuid::from_bytes(uuid_bytes);
|
||||
|
||||
let update_id_bytes = bytes
|
||||
.get((size_of::<u64>() + size_of::<Uuid>())..)?
|
||||
.try_into()
|
||||
.ok()?;
|
||||
let update_id = u64::from_be_bytes(update_id_bytes);
|
||||
|
||||
Some((global_id, uuid, update_id))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UpdateKeyCodec;
|
||||
|
||||
impl<'a> BytesEncode<'a> for UpdateKeyCodec {
|
||||
type EItem = (Uuid, u64);
|
||||
|
||||
fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
|
||||
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
|
||||
bytes.extend_from_slice(uuid.as_bytes());
|
||||
bytes.extend_from_slice(&update_id.to_be_bytes());
|
||||
Some(Cow::Owned(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> BytesDecode<'a> for UpdateKeyCodec {
|
||||
type DItem = (Uuid, u64);
|
||||
|
||||
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
|
||||
let uuid_bytes = bytes.get(0..size_of::<Uuid>())?.try_into().ok()?;
|
||||
let uuid = Uuid::from_bytes(uuid_bytes);
|
||||
|
||||
let update_id_bytes = bytes.get(size_of::<Uuid>()..)?.try_into().ok()?;
|
||||
let update_id = u64::from_be_bytes(update_id_bytes);
|
||||
|
||||
Some((uuid, update_id))
|
||||
}
|
||||
}
|
184
meilisearch-lib/src/index_controller/updates/store/dump.rs
Normal file
184
meilisearch-lib/src/index_controller/updates/store/dump.rs
Normal file
|
@ -0,0 +1,184 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
fs::{create_dir_all, File},
|
||||
io::Write,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use heed::RoTxn;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{Result, State, UpdateStore};
|
||||
use crate::index_controller::{updates::{IndexSender, status::UpdateStatus}};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct UpdateEntry {
|
||||
uuid: Uuid,
|
||||
update: UpdateStatus,
|
||||
}
|
||||
|
||||
impl UpdateStore {
|
||||
pub fn dump(
|
||||
&self,
|
||||
uuids: &HashSet<Uuid>,
|
||||
path: PathBuf,
|
||||
handle: IndexSender,
|
||||
) -> Result<()> {
|
||||
let state_lock = self.state.write();
|
||||
state_lock.swap(State::Dumping);
|
||||
|
||||
// txn must *always* be acquired after state lock, or it will dead lock.
|
||||
let txn = self.env.write_txn()?;
|
||||
|
||||
let dump_path = path.join("updates");
|
||||
create_dir_all(&dump_path)?;
|
||||
|
||||
self.dump_updates(&txn, uuids, &dump_path)?;
|
||||
|
||||
let fut = dump_indexes(uuids, handle, &path);
|
||||
tokio::runtime::Handle::current().block_on(fut)?;
|
||||
|
||||
state_lock.swap(State::Idle);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dump_updates(
|
||||
&self,
|
||||
txn: &RoTxn,
|
||||
uuids: &HashSet<Uuid>,
|
||||
path: impl AsRef<Path>,
|
||||
) -> Result<()> {
|
||||
let dump_data_path = path.as_ref().join("data.jsonl");
|
||||
let mut dump_data_file = File::create(dump_data_path)?;
|
||||
|
||||
let update_files_path = path.as_ref().join(super::UPDATE_DIR);
|
||||
create_dir_all(&update_files_path)?;
|
||||
|
||||
self.dump_pending(txn, uuids, &mut dump_data_file, &path)?;
|
||||
self.dump_completed(txn, uuids, &mut dump_data_file)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dump_pending(
|
||||
&self,
|
||||
_txn: &RoTxn,
|
||||
_uuids: &HashSet<Uuid>,
|
||||
_file: &mut File,
|
||||
_dst_path: impl AsRef<Path>,
|
||||
) -> Result<()> {
|
||||
todo!()
|
||||
//let pendings = self.pending_queue.iter(txn)?.lazily_decode_data();
|
||||
|
||||
//for pending in pendings {
|
||||
//let ((_, uuid, _), data) = pending?;
|
||||
//if uuids.contains(&uuid) {
|
||||
//let update = data.decode()?;
|
||||
|
||||
//if let Some(ref update_uuid) = update.content {
|
||||
//let src = super::update_uuid_to_file_path(&self.path, *update_uuid);
|
||||
//let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid);
|
||||
//std::fs::copy(src, dst)?;
|
||||
//}
|
||||
|
||||
//let update_json = UpdateEntry {
|
||||
//uuid,
|
||||
//update: update.into(),
|
||||
//};
|
||||
|
||||
//serde_json::to_writer(&mut file, &update_json)?;
|
||||
//file.write_all(b"\n")?;
|
||||
//}
|
||||
//}
|
||||
|
||||
//Ok(())
|
||||
}
|
||||
|
||||
fn dump_completed(
|
||||
&self,
|
||||
txn: &RoTxn,
|
||||
uuids: &HashSet<Uuid>,
|
||||
mut file: &mut File,
|
||||
) -> Result<()> {
|
||||
let updates = self.updates.iter(txn)?.lazily_decode_data();
|
||||
|
||||
for update in updates {
|
||||
let ((uuid, _), data) = update?;
|
||||
if uuids.contains(&uuid) {
|
||||
let update = data.decode()?;
|
||||
|
||||
let update_json = UpdateEntry { uuid, update };
|
||||
|
||||
serde_json::to_writer(&mut file, &update_json)?;
|
||||
file.write_all(b"\n")?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn load_dump(
|
||||
_src: impl AsRef<Path>,
|
||||
_dst: impl AsRef<Path>,
|
||||
_db_size: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
//let dst_update_path = dst.as_ref().join("updates/");
|
||||
//create_dir_all(&dst_update_path)?;
|
||||
|
||||
//let mut options = EnvOpenOptions::new();
|
||||
//options.map_size(db_size as usize);
|
||||
//let (store, _) = UpdateStore::new(options, &dst_update_path)?;
|
||||
|
||||
//let src_update_path = src.as_ref().join("updates");
|
||||
//let update_data = File::open(&src_update_path.join("data.jsonl"))?;
|
||||
//let mut update_data = BufReader::new(update_data);
|
||||
|
||||
//std::fs::create_dir_all(dst_update_path.join("update_files/"))?;
|
||||
|
||||
//let mut wtxn = store.env.write_txn()?;
|
||||
//let mut line = String::new();
|
||||
//loop {
|
||||
//match update_data.read_line(&mut line) {
|
||||
//Ok(0) => break,
|
||||
//Ok(_) => {
|
||||
//let UpdateEntry { uuid, update } = serde_json::from_str(&line)?;
|
||||
//store.register_raw_updates(&mut wtxn, &update, uuid)?;
|
||||
|
||||
//// Copy ascociated update path if it exists
|
||||
//if let UpdateStatus::Enqueued(Enqueued {
|
||||
//content: Some(uuid),
|
||||
//..
|
||||
//}) = update
|
||||
//{
|
||||
//let src = update_uuid_to_file_path(&src_update_path, uuid);
|
||||
//let dst = update_uuid_to_file_path(&dst_update_path, uuid);
|
||||
//std::fs::copy(src, dst)?;
|
||||
//}
|
||||
//}
|
||||
//_ => break,
|
||||
//}
|
||||
|
||||
//line.clear();
|
||||
//}
|
||||
|
||||
//wtxn.commit()?;
|
||||
|
||||
//Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn dump_indexes(
|
||||
uuids: &HashSet<Uuid>,
|
||||
handle: IndexSender,
|
||||
path: impl AsRef<Path>,
|
||||
) -> Result<()> {
|
||||
for uuid in uuids {
|
||||
//handle.dump(*uuid, path.as_ref().to_owned()).await?;
|
||||
todo!()
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
709
meilisearch-lib/src/index_controller/updates/store/mod.rs
Normal file
709
meilisearch-lib/src/index_controller/updates/store/mod.rs
Normal file
|
@ -0,0 +1,709 @@
|
|||
mod codec;
|
||||
pub mod dump;
|
||||
|
||||
use std::fs::{create_dir_all, remove_file};
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
path::PathBuf,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use futures::StreamExt;
|
||||
use heed::types::{ByteSlice, OwnedType, SerdeJson};
|
||||
use heed::zerocopy::U64;
|
||||
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
|
||||
use log::error;
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::time::timeout;
|
||||
use uuid::Uuid;
|
||||
|
||||
use codec::*;
|
||||
|
||||
use super::RegisterUpdate;
|
||||
use super::error::Result;
|
||||
use super::status::{Enqueued, Processing};
|
||||
use crate::EnvSizer;
|
||||
use crate::index_controller::update_files_path;
|
||||
use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*};
|
||||
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
type BEU64 = U64<heed::byteorder::BE>;
|
||||
|
||||
const UPDATE_DIR: &str = "update_files";
|
||||
|
||||
pub struct UpdateStoreInfo {
|
||||
/// Size of the update store in bytes.
|
||||
pub size: u64,
|
||||
/// Uuid of the currently processing update if it exists
|
||||
pub processing: Option<Uuid>,
|
||||
}
|
||||
|
||||
/// A data structure that allows concurrent reads AND exactly one writer.
|
||||
pub struct StateLock {
|
||||
lock: Mutex<()>,
|
||||
data: ArcSwap<State>,
|
||||
}
|
||||
|
||||
pub struct StateLockGuard<'a> {
|
||||
_lock: MutexGuard<'a, ()>,
|
||||
state: &'a StateLock,
|
||||
}
|
||||
|
||||
impl StateLockGuard<'_> {
|
||||
pub fn swap(&self, state: State) -> Arc<State> {
|
||||
self.state.data.swap(Arc::new(state))
|
||||
}
|
||||
}
|
||||
|
||||
impl StateLock {
|
||||
fn from_state(state: State) -> Self {
|
||||
let lock = Mutex::new(());
|
||||
let data = ArcSwap::from(Arc::new(state));
|
||||
Self { lock, data }
|
||||
}
|
||||
|
||||
pub fn read(&self) -> Arc<State> {
|
||||
self.data.load().clone()
|
||||
}
|
||||
|
||||
pub fn write(&self) -> StateLockGuard {
|
||||
let _lock = self.lock.lock();
|
||||
let state = &self;
|
||||
StateLockGuard { _lock, state }
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum State {
|
||||
Idle,
|
||||
Processing(Uuid, Processing),
|
||||
Snapshoting,
|
||||
Dumping,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UpdateStore {
|
||||
pub env: Env,
|
||||
/// A queue containing the updates to process, ordered by arrival.
|
||||
/// The key are built as follow:
|
||||
/// | global_update_id | index_uuid | update_id |
|
||||
/// | 8-bytes | 16-bytes | 8-bytes |
|
||||
pending_queue: Database<PendingKeyCodec, SerdeJson<Enqueued>>,
|
||||
/// Map indexes to the next available update id. If NextIdKey::Global is queried, then the next
|
||||
/// global update id is returned
|
||||
next_update_id: Database<NextIdCodec, OwnedType<BEU64>>,
|
||||
/// Contains all the performed updates meta, be they failed, aborted, or processed.
|
||||
/// The keys are built as follow:
|
||||
/// | Uuid | id |
|
||||
/// | 16-bytes | 8-bytes |
|
||||
updates: Database<UpdateKeyCodec, SerdeJson<UpdateStatus>>,
|
||||
/// Indicates the current state of the update store,
|
||||
state: Arc<StateLock>,
|
||||
/// Wake up the loop when a new event occurs.
|
||||
notification_sender: mpsc::Sender<()>,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl UpdateStore {
|
||||
fn new(
|
||||
mut options: EnvOpenOptions,
|
||||
path: impl AsRef<Path>,
|
||||
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
|
||||
options.max_dbs(5);
|
||||
|
||||
let update_path = path.as_ref().join("updates");
|
||||
std::fs::create_dir_all(&update_path)?;
|
||||
let env = options.open(update_path)?;
|
||||
let pending_queue = env.create_database(Some("pending-queue"))?;
|
||||
let next_update_id = env.create_database(Some("next-update-id"))?;
|
||||
let updates = env.create_database(Some("updates"))?;
|
||||
|
||||
let state = Arc::new(StateLock::from_state(State::Idle));
|
||||
|
||||
let (notification_sender, notification_receiver) = mpsc::channel(1);
|
||||
|
||||
Ok((
|
||||
Self {
|
||||
env,
|
||||
pending_queue,
|
||||
next_update_id,
|
||||
updates,
|
||||
state,
|
||||
notification_sender,
|
||||
path: path.as_ref().to_owned(),
|
||||
},
|
||||
notification_receiver,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn open(
|
||||
options: EnvOpenOptions,
|
||||
path: impl AsRef<Path>,
|
||||
index_handle: IndexSender,
|
||||
must_exit: Arc<AtomicBool>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
let (update_store, mut notification_receiver) = Self::new(options, path)?;
|
||||
let update_store = Arc::new(update_store);
|
||||
|
||||
// Send a first notification to trigger the process.
|
||||
if let Err(TrySendError::Closed(())) = update_store.notification_sender.try_send(()) {
|
||||
panic!("Failed to init update store");
|
||||
}
|
||||
|
||||
// We need a weak reference so we can take ownership on the arc later when we
|
||||
// want to close the index.
|
||||
let duration = Duration::from_secs(10 * 60); // 10 minutes
|
||||
let update_store_weak = Arc::downgrade(&update_store);
|
||||
tokio::task::spawn_local(async move {
|
||||
// Block and wait for something to process with a timeout. The timeout
|
||||
// function returns a Result and we must just unlock the loop on Result.
|
||||
'outer: while timeout(duration, notification_receiver.recv())
|
||||
.await
|
||||
.map_or(true, |o| o.is_some())
|
||||
{
|
||||
loop {
|
||||
match update_store_weak.upgrade() {
|
||||
Some(update_store) => {
|
||||
let handler = index_handle.clone();
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
update_store.process_pending_update(handler)
|
||||
})
|
||||
.await
|
||||
.expect("Fatal error processing update.");
|
||||
match res {
|
||||
Ok(Some(_)) => (),
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
error!("Fatal error while processing an update that requires the update store to shutdown: {}", e);
|
||||
must_exit.store(true, Ordering::SeqCst);
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
// the ownership on the arc has been taken, we need to exit.
|
||||
None => break 'outer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
error!("Update store loop exited.");
|
||||
});
|
||||
|
||||
Ok(update_store)
|
||||
}
|
||||
|
||||
/// Returns the next global update id and the next update id for a given `index_uuid`.
|
||||
fn next_update_id(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<(u64, u64)> {
|
||||
let global_id = self
|
||||
.next_update_id
|
||||
.get(txn, &NextIdKey::Global)?
|
||||
.map(U64::get)
|
||||
.unwrap_or_default();
|
||||
|
||||
self.next_update_id
|
||||
.put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?;
|
||||
|
||||
let update_id = self.next_update_id_raw(txn, index_uuid)?;
|
||||
|
||||
Ok((global_id, update_id))
|
||||
}
|
||||
|
||||
/// Returns the next next update id for a given `index_uuid` without
|
||||
/// incrementing the global update id. This is useful for the dumps.
|
||||
fn next_update_id_raw(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<u64> {
|
||||
let update_id = self
|
||||
.next_update_id
|
||||
.get(txn, &NextIdKey::Index(index_uuid))?
|
||||
.map(U64::get)
|
||||
.unwrap_or_default();
|
||||
|
||||
self.next_update_id.put(
|
||||
txn,
|
||||
&NextIdKey::Index(index_uuid),
|
||||
&BEU64::new(update_id + 1),
|
||||
)?;
|
||||
|
||||
Ok(update_id)
|
||||
}
|
||||
|
||||
/// Registers the update content in the pending store and the meta
|
||||
/// into the pending-meta store. Returns the new unique update id.
|
||||
pub fn register_update(
|
||||
&self,
|
||||
index_uuid: Uuid,
|
||||
update: RegisterUpdate,
|
||||
) -> heed::Result<Enqueued> {
|
||||
let mut txn = self.env.write_txn()?;
|
||||
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
|
||||
let meta = Enqueued::new(update, update_id);
|
||||
|
||||
self.pending_queue
|
||||
.put(&mut txn, &(global_id, index_uuid, update_id), &meta)?;
|
||||
|
||||
txn.commit()?;
|
||||
|
||||
if let Err(TrySendError::Closed(())) = self.notification_sender.try_send(()) {
|
||||
panic!("Update store loop exited");
|
||||
}
|
||||
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
// /// Push already processed update in the UpdateStore without triggering the notification
|
||||
// /// process. This is useful for the dumps.
|
||||
//pub fn register_raw_updates(
|
||||
//&self,
|
||||
//wtxn: &mut heed::RwTxn,
|
||||
//update: &UpdateStatus,
|
||||
//index_uuid: Uuid,
|
||||
//) -> heed::Result<()> {
|
||||
//match update {
|
||||
//UpdateStatus::Enqueued(enqueued) => {
|
||||
//let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?;
|
||||
//self.pending_queue.remap_key_type::<PendingKeyCodec>().put(
|
||||
//wtxn,
|
||||
//&(global_id, index_uuid, enqueued.id()),
|
||||
//enqueued,
|
||||
//)?;
|
||||
//}
|
||||
//_ => {
|
||||
//let _update_id = self.next_update_id_raw(wtxn, index_uuid)?;
|
||||
//self.updates.put(wtxn, &(index_uuid, update.id()), update)?;
|
||||
//}
|
||||
//}
|
||||
//Ok(())
|
||||
//}
|
||||
|
||||
/// Executes the user provided function on the next pending update (the one with the lowest id).
|
||||
/// This is asynchronous as it let the user process the update with a read-only txn and
|
||||
/// only writing the result meta to the processed-meta store *after* it has been processed.
|
||||
fn process_pending_update(&self, index_handle: IndexSender) -> Result<Option<()>> {
|
||||
// Create a read transaction to be able to retrieve the pending update in order.
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let first_meta = self.pending_queue.first(&rtxn)?;
|
||||
drop(rtxn);
|
||||
|
||||
// If there is a pending update we process and only keep
|
||||
// a reader while processing it, not a writer.
|
||||
match first_meta {
|
||||
Some(((global_id, index_uuid, _), pending)) => {
|
||||
let processing = pending.processing();
|
||||
// Acquire the state lock and set the current state to processing.
|
||||
// txn must *always* be acquired after state lock, or it will dead lock.
|
||||
let state = self.state.write();
|
||||
state.swap(State::Processing(index_uuid, processing.clone()));
|
||||
|
||||
let result =
|
||||
self.perform_update(processing, index_handle, index_uuid, global_id);
|
||||
|
||||
state.swap(State::Idle);
|
||||
|
||||
result
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn perform_update(
|
||||
&self,
|
||||
processing: Processing,
|
||||
index_handle: IndexSender,
|
||||
index_uuid: Uuid,
|
||||
global_id: u64,
|
||||
) -> Result<Option<()>> {
|
||||
// Process the pending update using the provided user function.
|
||||
let handle = Handle::current();
|
||||
let update_id = processing.id();
|
||||
let result =
|
||||
match handle.block_on(/*index_handle.update(index_uuid, processing.clone())*/ todo!()) {
|
||||
Ok(result) => result,
|
||||
Err(e) => Err(processing.fail(e)),
|
||||
};
|
||||
|
||||
// Once the pending update have been successfully processed
|
||||
// we must remove the content from the pending and processing stores and
|
||||
// write the *new* meta to the processed-meta store and commit.
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
self.pending_queue
|
||||
.delete(&mut wtxn, &(global_id, index_uuid, update_id))?;
|
||||
|
||||
let result = match result {
|
||||
Ok(res) => res.into(),
|
||||
Err(res) => res.into(),
|
||||
};
|
||||
|
||||
self.updates
|
||||
.put(&mut wtxn, &(index_uuid, update_id), &result)?;
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(Some(()))
|
||||
}
|
||||
|
||||
/// List the updates for `index_uuid`.
|
||||
pub fn list(&self, index_uuid: Uuid) -> Result<Vec<UpdateStatus>> {
|
||||
let mut update_list = BTreeMap::<u64, UpdateStatus>::new();
|
||||
|
||||
let txn = self.env.read_txn()?;
|
||||
|
||||
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
|
||||
for entry in pendings {
|
||||
let ((_, uuid, id), pending) = entry?;
|
||||
if uuid == index_uuid {
|
||||
update_list.insert(id, pending.decode()?.into());
|
||||
}
|
||||
}
|
||||
|
||||
let updates = self
|
||||
.updates
|
||||
.remap_key_type::<ByteSlice>()
|
||||
.prefix_iter(&txn, index_uuid.as_bytes())?;
|
||||
|
||||
for entry in updates {
|
||||
let (_, update) = entry?;
|
||||
update_list.insert(update.id(), update);
|
||||
}
|
||||
|
||||
// If the currently processing update is from this index, replace the corresponding pending update with this one.
|
||||
match *self.state.read() {
|
||||
State::Processing(uuid, ref processing) if uuid == index_uuid => {
|
||||
update_list.insert(processing.id(), processing.clone().into());
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
Ok(update_list.into_iter().map(|(_, v)| v).collect())
|
||||
}
|
||||
|
||||
/// Returns the update associated meta or `None` if the update doesn't exist.
|
||||
pub fn meta(&self, index_uuid: Uuid, update_id: u64) -> heed::Result<Option<UpdateStatus>> {
|
||||
// Check if the update is the one currently processing
|
||||
match *self.state.read() {
|
||||
State::Processing(uuid, ref processing)
|
||||
if uuid == index_uuid && processing.id() == update_id =>
|
||||
{
|
||||
return Ok(Some(processing.clone().into()));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
let txn = self.env.read_txn()?;
|
||||
// Else, check if it is in the updates database:
|
||||
let update = self.updates.get(&txn, &(index_uuid, update_id))?;
|
||||
|
||||
if let Some(update) = update {
|
||||
return Ok(Some(update));
|
||||
}
|
||||
|
||||
// If nothing was found yet, we resolve to iterate over the pending queue.
|
||||
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
|
||||
|
||||
for entry in pendings {
|
||||
let ((_, uuid, id), pending) = entry?;
|
||||
if uuid == index_uuid && id == update_id {
|
||||
return Ok(Some(pending.decode()?.into()));
|
||||
}
|
||||
}
|
||||
|
||||
// No update was found.
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Delete all updates for an index from the update store. If the currently processing update
|
||||
/// is for `index_uuid`, the call will block until the update is terminated.
|
||||
pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> {
|
||||
let mut txn = self.env.write_txn()?;
|
||||
// Contains all the content file paths that we need to be removed if the deletion was successful.
|
||||
let uuids_to_remove = Vec::new();
|
||||
|
||||
let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data();
|
||||
|
||||
while let Some(Ok(((_, uuid, _), pending))) = pendings.next() {
|
||||
if uuid == index_uuid {
|
||||
let mut _pending = pending.decode()?;
|
||||
//if let Some(update_uuid) = pending.content.take() {
|
||||
//uuids_to_remove.push(update_uuid);
|
||||
//}
|
||||
|
||||
// Invariant check: we can only delete the current entry when we don't hold
|
||||
// references to it anymore. This must be done after we have retrieved its content.
|
||||
unsafe {
|
||||
pendings.del_current()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(pendings);
|
||||
|
||||
let mut updates = self
|
||||
.updates
|
||||
.remap_key_type::<ByteSlice>()
|
||||
.prefix_iter_mut(&mut txn, index_uuid.as_bytes())?
|
||||
.lazily_decode_data();
|
||||
|
||||
while let Some(_) = updates.next() {
|
||||
unsafe {
|
||||
updates.del_current()?;
|
||||
}
|
||||
}
|
||||
|
||||
drop(updates);
|
||||
|
||||
txn.commit()?;
|
||||
|
||||
// If the currently processing update is from our index, we wait until it is
|
||||
// finished before returning. This ensure that no write to the index occurs after we delete it.
|
||||
if let State::Processing(uuid, _) = *self.state.read() {
|
||||
if uuid == index_uuid {
|
||||
// wait for a write lock, do nothing with it.
|
||||
self.state.write();
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, remove any outstanding update files. This must be done after waiting for the
|
||||
// last update to ensure that the update files are not deleted before the update needs
|
||||
// them.
|
||||
uuids_to_remove
|
||||
.iter()
|
||||
.map(|uuid: &Uuid| update_files_path(&self.path).join(uuid.to_string()))
|
||||
.for_each(|path| {
|
||||
let _ = remove_file(path);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn snapshot(
|
||||
&self,
|
||||
uuids: &HashSet<Uuid>,
|
||||
path: impl AsRef<Path>,
|
||||
handle: IndexSender,
|
||||
) -> Result<()> {
|
||||
let state_lock = self.state.write();
|
||||
state_lock.swap(State::Snapshoting);
|
||||
|
||||
let txn = self.env.write_txn()?;
|
||||
|
||||
let update_path = path.as_ref().join("updates");
|
||||
create_dir_all(&update_path)?;
|
||||
|
||||
// acquire write lock to prevent further writes during snapshot
|
||||
create_dir_all(&update_path)?;
|
||||
let db_path = update_path.join("data.mdb");
|
||||
|
||||
// create db snapshot
|
||||
self.env.copy_to_path(&db_path, CompactionOption::Enabled)?;
|
||||
|
||||
let update_files_path = update_path.join(UPDATE_DIR);
|
||||
create_dir_all(&update_files_path)?;
|
||||
|
||||
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
|
||||
|
||||
for entry in pendings {
|
||||
let ((_, _uuid, _), _pending) = entry?;
|
||||
//if uuids.contains(&uuid) {
|
||||
//if let Enqueued {
|
||||
//content: Some(uuid),
|
||||
//..
|
||||
//} = pending.decode()?
|
||||
//{
|
||||
//let path = update_uuid_to_file_path(&self.path, uuid);
|
||||
//copy(path, &update_files_path)?;
|
||||
//}
|
||||
//}
|
||||
}
|
||||
|
||||
let path = &path.as_ref().to_path_buf();
|
||||
let handle = &handle;
|
||||
// Perform the snapshot of each index concurently. Only a third of the capabilities of
|
||||
// the index actor at a time not to put too much pressure on the index actor
|
||||
let mut stream = futures::stream::iter(uuids.iter())
|
||||
.map(move |uuid| todo!() /*handle.snapshot(*uuid, path.clone())*/)
|
||||
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
|
||||
|
||||
Handle::current().block_on(async {
|
||||
while let Some(res) = stream.next().await {
|
||||
res?;
|
||||
}
|
||||
Ok(()) as Result<()>
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_info(&self) -> Result<UpdateStoreInfo> {
|
||||
let size = self.env.size();
|
||||
let txn = self.env.read_txn()?;
|
||||
for entry in self.pending_queue.iter(&txn)? {
|
||||
let (_, _pending) = entry?;
|
||||
//if let Enqueued {
|
||||
//content: Some(uuid),
|
||||
//..
|
||||
//} = pending
|
||||
//{
|
||||
//let path = update_uuid_to_file_path(&self.path, uuid);
|
||||
//size += File::open(path)?.metadata()?.len();
|
||||
//}
|
||||
}
|
||||
let processing = match *self.state.read() {
|
||||
State::Processing(uuid, _) => Some(uuid),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
Ok(UpdateStoreInfo { size, processing })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::index_controller::{
|
||||
index_actor::{error::IndexActorError, MockIndexActorHandle},
|
||||
UpdateResult,
|
||||
};
|
||||
|
||||
use futures::future::ok;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_next_id() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let handle = Arc::new(MockIndexActorHandle::new());
|
||||
options.map_size(4096 * 100);
|
||||
let update_store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
handle,
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let index1_uuid = Uuid::new_v4();
|
||||
let index2_uuid = Uuid::new_v4();
|
||||
|
||||
let mut txn = update_store.env.write_txn().unwrap();
|
||||
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
txn.commit().unwrap();
|
||||
assert_eq!((0, 0), ids);
|
||||
|
||||
let mut txn = update_store.env.write_txn().unwrap();
|
||||
let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
|
||||
txn.commit().unwrap();
|
||||
assert_eq!((1, 0), ids);
|
||||
|
||||
let mut txn = update_store.env.write_txn().unwrap();
|
||||
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
txn.commit().unwrap();
|
||||
assert_eq!((2, 1), ids);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_register_update() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let handle = Arc::new(MockIndexActorHandle::new());
|
||||
options.map_size(4096 * 100);
|
||||
let update_store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
handle,
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
)
|
||||
.unwrap();
|
||||
let meta = UpdateMeta::ClearDocuments;
|
||||
let uuid = Uuid::new_v4();
|
||||
let store_clone = update_store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
store_clone.register_update(meta, None, uuid).unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let txn = update_store.env.read_txn().unwrap();
|
||||
assert!(update_store
|
||||
.pending_queue
|
||||
.get(&txn, &(0, uuid, 0))
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_process_update() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let mut handle = MockIndexActorHandle::new();
|
||||
|
||||
handle
|
||||
.expect_update()
|
||||
.times(2)
|
||||
.returning(|_index_uuid, processing, _file| {
|
||||
if processing.id() == 0 {
|
||||
Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
|
||||
} else {
|
||||
Box::pin(ok(Err(
|
||||
processing.fail(IndexActorError::ExistingPrimaryKey.into())
|
||||
)))
|
||||
}
|
||||
});
|
||||
|
||||
let handle = Arc::new(handle);
|
||||
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(4096 * 100);
|
||||
let store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
handle.clone(),
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// wait a bit for the event loop exit.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
|
||||
let mut txn = store.env.write_txn().unwrap();
|
||||
|
||||
let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None);
|
||||
let uuid = Uuid::new_v4();
|
||||
|
||||
store
|
||||
.pending_queue
|
||||
.put(&mut txn, &(0, uuid, 0), &update)
|
||||
.unwrap();
|
||||
|
||||
let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None);
|
||||
|
||||
store
|
||||
.pending_queue
|
||||
.put(&mut txn, &(1, uuid, 1), &update)
|
||||
.unwrap();
|
||||
|
||||
txn.commit().unwrap();
|
||||
|
||||
// Process the pending, and check that it has been moved to the update databases, and
|
||||
// removed from the pending database.
|
||||
let store_clone = store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
store_clone.process_pending_update(handle.clone()).unwrap();
|
||||
store_clone.process_pending_update(handle).unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let txn = store.env.read_txn().unwrap();
|
||||
|
||||
assert!(store.pending_queue.first(&txn).unwrap().is_none());
|
||||
let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap();
|
||||
|
||||
assert!(matches!(update, UpdateStatus::Processed(_)));
|
||||
let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap();
|
||||
|
||||
assert!(matches!(update, UpdateStatus::Failed(_)));
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue