refactor index actor

This commit is contained in:
mpostma 2021-09-22 15:07:04 +02:00
parent 12542bf922
commit 5353be74c3
18 changed files with 590 additions and 596 deletions

View file

@ -0,0 +1,64 @@
use std::fmt;
use meilisearch_error::{Code, ErrorCode};
use crate::{error::MilliError, index::error::IndexError};
pub type Result<T> = std::result::Result<T, IndexActorError>;
#[derive(thiserror::Error, Debug)]
pub enum IndexActorError {
#[error("{0}")]
IndexError(#[from] IndexError),
#[error("Index already exists")]
IndexAlreadyExists,
#[error("Index not found")]
UnexistingIndex,
#[error("A primary key is already present. It's impossible to update it")]
ExistingPrimaryKey,
#[error("Internal Error: {0}")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("{0}")]
Milli(#[from] milli::Error),
}
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for IndexActorError
where T: Send + Sync + 'static + fmt::Debug
{
fn from(other: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::Internal(Box::new(other))
}
}
impl From<tokio::sync::oneshot::error::RecvError> for IndexActorError {
fn from(other: tokio::sync::oneshot::error::RecvError) -> Self {
Self::Internal(Box::new(other))
}
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for IndexActorError {
fn from(other: $other) -> Self {
Self::Internal(Box::new(other))
}
}
)*
}
}
internal_error!(heed::Error, tokio::task::JoinError, std::io::Error);
impl ErrorCode for IndexActorError {
fn error_code(&self) -> Code {
match self {
IndexActorError::IndexError(e) => e.error_code(),
IndexActorError::IndexAlreadyExists => Code::IndexAlreadyExists,
IndexActorError::UnexistingIndex => Code::IndexNotFound,
IndexActorError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent,
IndexActorError::Internal(_) => Code::Internal,
IndexActorError::Milli(e) => MilliError(e).error_code(),
}
}
}

View file

@ -0,0 +1,212 @@
use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use super::error::Result;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::updates::status::{Failed, Processed, Processing};
use crate::index_controller::{IndexSettings, IndexStats};
use super::IndexMeta;
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum IndexMsg {
CreateIndex {
uuid: Uuid,
primary_key: Option<String>,
ret: oneshot::Sender<Result<IndexMeta>>,
},
Update {
uuid: Uuid,
meta: Processing,
ret: oneshot::Sender<Result<std::result::Result<Processed, Failed>>>,
},
Search {
uuid: Uuid,
query: SearchQuery,
ret: oneshot::Sender<Result<SearchResult>>,
},
Settings {
uuid: Uuid,
ret: oneshot::Sender<Result<Settings<Checked>>>,
},
Documents {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
offset: usize,
limit: usize,
ret: oneshot::Sender<Result<Vec<Document>>>,
},
Document {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
doc_id: String,
ret: oneshot::Sender<Result<Document>>,
},
Delete {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
GetMeta {
uuid: Uuid,
ret: oneshot::Sender<Result<IndexMeta>>,
},
UpdateIndex {
uuid: Uuid,
index_settings: IndexSettings,
ret: oneshot::Sender<Result<IndexMeta>>,
},
Snapshot {
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
Dump {
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
GetStats {
uuid: Uuid,
ret: oneshot::Sender<Result<IndexStats>>,
},
}
impl IndexMsg {
pub async fn search(
sender: &mpsc::Sender<Self>,
uuid: Uuid,
query: SearchQuery,
) -> Result<SearchResult> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Search {
ret,
uuid,
query,
};
sender.send(msg).await?;
rcv.await?
}
pub async fn update_index(
sender: &mpsc::Sender<Self>,
uuid: Uuid,
index_settings: IndexSettings,
) -> Result<IndexMeta> {
let (ret, rcv) = oneshot::channel();
let msg = Self::UpdateIndex {
ret,
uuid,
index_settings,
};
sender.send(msg).await?;
rcv.await?
}
pub async fn create_index(
sender: &mpsc::Sender<Self>,
uuid: Uuid,
primary_key: Option<String>,
) -> Result<IndexMeta> {
let (ret, rcv) = oneshot::channel();
let msg = Self::CreateIndex {
ret,
uuid,
primary_key,
};
sender.send(msg).await?;
rcv.await?
}
pub async fn index_meta(sender: &mpsc::Sender<Self>, uuid: Uuid) -> Result<IndexMeta> {
let (ret, rcv) = oneshot::channel();
let msg = Self::GetMeta { ret, uuid };
sender.send(msg).await?;
rcv.await?
}
pub async fn index_stats(sender: &mpsc::Sender<Self>, uuid: Uuid) -> Result<IndexStats> {
let (ret, rcv) = oneshot::channel();
let msg = Self::GetStats { ret, uuid };
sender.send(msg).await?;
rcv.await?
}
pub async fn settings(sender: &mpsc::Sender<Self>, uuid: Uuid) -> Result<Settings<Checked>> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Settings { ret, uuid };
sender.send(msg).await?;
rcv.await?
}
pub async fn documents(
sender: &mpsc::Sender<Self>,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Documents {
ret,
uuid,
attributes_to_retrieve,
offset,
limit,
};
sender.send(msg).await?;
rcv.await?
}
pub async fn document(
sender: &mpsc::Sender<Self>,
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
doc_id: String,
) -> Result<Document> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Document {
ret,
uuid,
attributes_to_retrieve,
doc_id,
};
sender.send(msg).await?;
rcv.await?
}
pub async fn update(sender: &mpsc::Sender<Self>, uuid: Uuid, meta: Processing) -> Result<std::result::Result<Processed, Failed>> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Update {
ret,
uuid,
meta,
};
sender.send(msg).await?;
rcv.await?
}
pub async fn snapshot(sender: &mpsc::Sender<IndexMsg>, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Snapshot {
uuid,
path,
ret,
};
sender.send(msg).await?;
rcv.await?
}
pub async fn dump(sender: &mpsc::Sender<Self>, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Dump {
uuid,
ret,
path,
};
sender.send(msg).await?;
rcv.await?
}
}

View file

@ -0,0 +1,483 @@
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_stream::stream;
use futures::stream::StreamExt;
use heed::CompactionOption;
use log::debug;
use milli::update::UpdateBuilder;
use tokio::task::spawn_blocking;
use tokio::{fs, sync::mpsc};
use crate::index::update_handler::UpdateHandler;
use crate::index_controller::updates::status::{Failed, Processed, Processing};
use crate::index_controller::{get_arc_ownership_blocking, IndexStats};
use crate::options::IndexerOpts;
pub const CONCURRENT_INDEX_MSG: usize = 10;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub use message::IndexMsg;
use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings};
use error::Result;
use self::error::IndexActorError;
use self::store::{IndexStore, MapIndexStore};
use super::IndexSettings;
pub mod error;
mod message;
mod store;
pub type IndexHandlerSender = mpsc::Sender<IndexMsg>;
pub fn create_indexes_handler(
db_path: impl AsRef<Path>,
index_size: usize,
indexer_options: &IndexerOpts,
) -> anyhow::Result<IndexHandlerSender> {
let (sender, receiver) = mpsc::channel(100);
let store = MapIndexStore::new(&db_path, index_size);
let actor = IndexActor::new(receiver, store, indexer_options)?;
tokio::task::spawn(actor.run());
Ok(sender)
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub primary_key: Option<String>,
}
impl IndexMeta {
fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
let created_at = index.created_at(txn)?;
let updated_at = index.updated_at(txn)?;
let primary_key = index.primary_key(txn)?.map(String::from);
Ok(Self {
created_at,
updated_at,
primary_key,
})
}
}
pub struct IndexActor<S> {
receiver: Option<mpsc::Receiver<IndexMsg>>,
update_handler: Arc<UpdateHandler>,
store: S,
}
impl<S> IndexActor<S>
where
S: IndexStore + Sync + Send,
{
pub fn new(
receiver: mpsc::Receiver<IndexMsg>,
store: S,
options: &IndexerOpts,
) -> anyhow::Result<Self> {
let update_handler = Arc::new(UpdateHandler::new(options)?);
let receiver = Some(receiver);
Ok(Self {
receiver,
update_handler,
store,
})
}
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
/// through the read channel are processed concurrently, the messages sent through the write
/// channel are processed one at a time.
pub async fn run(mut self) {
let mut receiver = self
.receiver
.take()
.expect("Index Actor must have a inbox at this point.");
let stream = stream! {
loop {
match receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream
.for_each_concurrent(Some(CONCURRENT_INDEX_MSG), |msg| self.handle_message(msg))
.await;
}
async fn handle_message(&self, msg: IndexMsg) {
use IndexMsg::*;
match msg {
CreateIndex {
uuid,
primary_key,
ret,
} => {
let _ = ret.send(self.handle_create_index(uuid, primary_key).await);
}
Update { ret, meta, uuid } => {
let _ = ret.send(self.handle_update(uuid, meta).await);
}
Search { ret, query, uuid } => {
let _ = ret.send(self.handle_search(uuid, query).await);
}
Settings { ret, uuid } => {
let _ = ret.send(self.handle_settings(uuid).await);
}
Documents {
ret,
uuid,
attributes_to_retrieve,
offset,
limit,
} => {
let _ = ret.send(
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve)
.await,
);
}
Document {
uuid,
attributes_to_retrieve,
doc_id,
ret,
} => {
let _ = ret.send(
self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve)
.await,
);
}
Delete { uuid, ret } => {
let _ = ret.send(self.handle_delete(uuid).await);
}
GetMeta { uuid, ret } => {
let _ = ret.send(self.handle_get_meta(uuid).await);
}
UpdateIndex {
uuid,
index_settings,
ret,
} => {
let _ = ret.send(self.handle_update_index(uuid, index_settings).await);
}
Snapshot { uuid, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
Dump { uuid, path, ret } => {
let _ = ret.send(self.handle_dump(uuid, path).await);
}
GetStats { uuid, ret } => {
let _ = ret.send(self.handle_get_stats(uuid).await);
}
}
}
async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexActorError::UnexistingIndex)?;
let result = spawn_blocking(move || index.perform_search(query)).await??;
Ok(result)
}
async fn handle_create_index(
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> Result<IndexMeta> {
let index = self.store.create(uuid, primary_key).await?;
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
Ok(meta)
}
async fn handle_update(
&self,
uuid: Uuid,
meta: Processing,
) -> Result<std::result::Result<Processed, Failed>> {
debug!("Processing update {}", meta.id());
let update_handler = self.update_handler.clone();
let index = match self.store.get(uuid).await? {
Some(index) => index,
None => self.store.create(uuid, None).await?,
};
Ok(spawn_blocking(move || update_handler.handle_update(index, meta)).await?)
}
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexActorError::UnexistingIndex)?;
let result = spawn_blocking(move || index.settings()).await??;
Ok(result)
}
async fn handle_fetch_documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexActorError::UnexistingIndex)?;
let result =
spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve))
.await??;
Ok(result)
}
async fn handle_fetch_document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexActorError::UnexistingIndex)?;
let result =
spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve))
.await??;
Ok(result)
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let index = self.store.delete(uuid).await?;
if let Some(index) = index {
tokio::task::spawn(async move {
let index = index.inner;
let store = get_arc_ownership_blocking(index).await;
spawn_blocking(move || {
store.prepare_for_closing().wait();
debug!("Index closed");
});
});
}
Ok(())
}
async fn handle_get_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
match self.store.get(uuid).await? {
Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
Ok(meta)
}
None => Err(IndexActorError::UnexistingIndex),
}
}
async fn handle_update_index(
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> Result<IndexMeta> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexActorError::UnexistingIndex)?;
let result = spawn_blocking(move || match index_settings.primary_key {
Some(primary_key) => {
let mut txn = index.write_txn()?;
if index.primary_key(&txn)?.is_some() {
return Err(IndexActorError::ExistingPrimaryKey);
}
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index);
builder.set_primary_key(primary_key);
builder.execute(|_, _| ())?;
let meta = IndexMeta::new_txn(&index, &txn)?;
txn.commit()?;
Ok(meta)
}
None => {
let meta = IndexMeta::new(&index)?;
Ok(meta)
}
})
.await??;
Ok(result)
}
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> {
use tokio::fs::create_dir_all;
path.push("indexes");
create_dir_all(&path).await?;
if let Some(index) = self.store.get(uuid).await? {
let mut index_path = path.join(format!("index-{}", uuid));
create_dir_all(&index_path).await?;
index_path.push("data.mdb");
spawn_blocking(move || -> Result<()> {
// Get write txn to wait for ongoing write transaction before snapshot.
let _txn = index.write_txn()?;
index
.env
.copy_to_path(index_path, CompactionOption::Enabled)?;
Ok(())
})
.await??;
}
Ok(())
}
/// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the
/// documents and all the settings.
async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexActorError::UnexistingIndex)?;
let path = path.join(format!("indexes/index-{}/", uuid));
fs::create_dir_all(&path).await?;
tokio::task::spawn_blocking(move || index.dump(path)).await??;
Ok(())
}
async fn handle_get_stats(&self, uuid: Uuid) -> Result<IndexStats> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexActorError::UnexistingIndex)?;
spawn_blocking(move || {
let rtxn = index.read_txn()?;
Ok(IndexStats {
size: index.size(),
number_of_documents: index.number_of_documents(&rtxn)?,
is_indexing: None,
field_distribution: index.field_distribution(&rtxn)?,
})
})
.await?
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use super::*;
#[async_trait::async_trait]
/// Useful for passing around an `Arc<MockIndexActorHandle>` in tests.
impl IndexActorHandle for Arc<MockIndexActorHandle> {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
self.as_ref().create_index(uuid, primary_key).await
}
async fn update(
&self,
uuid: Uuid,
meta: Processing,
data: Option<std::fs::File>,
) -> Result<std::result::Result<Processed, Failed>> {
self.as_ref().update(uuid, meta, data).await
}
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
self.as_ref().search(uuid, query).await
}
async fn settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {
self.as_ref().settings(uuid).await
}
async fn documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
self.as_ref()
.documents(uuid, offset, limit, attributes_to_retrieve)
.await
}
async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
self.as_ref()
.document(uuid, doc_id, attributes_to_retrieve)
.await
}
async fn delete(&self, uuid: Uuid) -> Result<()> {
self.as_ref().delete(uuid).await
}
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
self.as_ref().get_index_meta(uuid).await
}
async fn update_index(
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> Result<IndexMeta> {
self.as_ref().update_index(uuid, index_settings).await
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
self.as_ref().snapshot(uuid, path).await
}
async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
self.as_ref().dump(uuid, path).await
}
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats> {
self.as_ref().get_index_stats(uuid).await
}
}
}

View file

@ -0,0 +1,109 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use milli::update::UpdateBuilder;
use tokio::fs;
use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::error::{IndexActorError, Result};
use crate::index::Index;
use crate::index_controller::update_file_store::UpdateFileStore;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
#[async_trait::async_trait]
pub trait IndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
}
pub struct MapIndexStore {
index_store: AsyncMap<Uuid, Index>,
path: PathBuf,
index_size: usize,
update_file_store: Arc<UpdateFileStore>,
}
impl MapIndexStore {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> Self {
let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap());
let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new()));
Self {
index_store,
path,
index_size,
update_file_store,
}
}
}
#[async_trait::async_trait]
impl IndexStore for MapIndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
// We need to keep the lock until we are sure the db file has been opened correclty, to
// ensure that another db is not created at the same time.
let mut lock = self.index_store.write().await;
if let Some(index) = lock.get(&uuid) {
return Ok(index.clone());
}
let path = self.path.join(format!("index-{}", uuid));
if path.exists() {
return Err(IndexActorError::IndexAlreadyExists);
}
let index_size = self.index_size;
let file_store = self.update_file_store.clone();
let index = spawn_blocking(move || -> Result<Index> {
let index = Index::open(path, index_size, file_store)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index);
builder.set_primary_key(primary_key);
builder.execute(|_, _| ())?;
txn.commit()?;
}
Ok(index)
})
.await??;
lock.insert(uuid, index.clone());
Ok(index)
}
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
let guard = self.index_store.read().await;
match guard.get(&uuid) {
Some(index) => Ok(Some(index.clone())),
None => {
// drop the guard here so we can perform the write after without deadlocking;
drop(guard);
let path = self.path.join(format!("index-{}", uuid));
if !path.exists() {
return Ok(None);
}
let index_size = self.index_size;
let file_store = self.update_file_store.clone();
let index = spawn_blocking(move || Index::open(path, index_size, file_store)).await??;
self.index_store.write().await.insert(uuid, index.clone());
Ok(Some(index))
}
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid));
fs::remove_dir_all(db_path).await?;
let index = self.index_store.write().await.remove(&uuid);
Ok(index)
}
}