introduce index resolver

This commit is contained in:
mpostma 2021-09-24 11:53:11 +02:00
parent 5353be74c3
commit 42a6260b65
23 changed files with 833 additions and 193 deletions

View File

@ -17,6 +17,8 @@ pub enum IndexError {
Facet(#[from] FacetError),
#[error("{0}")]
Milli(#[from] milli::Error),
#[error("A primary key is already present. It's impossible to update it")]
ExistingPrimaryKey,
}
internal_error!(
@ -33,6 +35,7 @@ impl ErrorCode for IndexError {
IndexError::DocumentNotFound(_) => Code::DocumentNotFound,
IndexError::Facet(e) => e.error_code(),
IndexError::Milli(e) => MilliError(e).error_code(),
IndexError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent,
}
}
}

View File

@ -5,19 +5,23 @@ use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use heed::{EnvOpenOptions, RoTxn};
use milli::update::Setting;
use milli::{obkv_to_json, FieldId};
use milli::{FieldDistribution, FieldId, obkv_to_json};
use serde_json::{Map, Value};
use serde::{Serialize, Deserialize};
use error::Result;
pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{Checked, Facets, Settings, Unchecked};
use uuid::Uuid;
use crate::EnvSizer;
use crate::index_controller::update_file_store::UpdateFileStore;
use self::error::IndexError;
use self::update_handler::UpdateHandler;
pub mod error;
pub mod update_handler;
@ -28,10 +32,51 @@ mod updates;
pub type Document = Map<String, Value>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub primary_key: Option<String>,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct IndexStats {
#[serde(skip)]
pub size: u64,
pub number_of_documents: u64,
/// Whether the current index is performing an update. It is initially `None` when the
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
pub is_indexing: Option<bool>,
pub field_distribution: FieldDistribution,
}
impl IndexMeta {
pub fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
let created_at = index.created_at(txn)?;
let updated_at = index.updated_at(txn)?;
let primary_key = index.primary_key(txn)?.map(String::from);
Ok(Self {
created_at,
updated_at,
primary_key,
})
}
}
#[derive(Clone)]
pub struct Index {
pub uuid: Uuid,
pub inner: Arc<milli::Index>,
update_file_store: Arc<UpdateFileStore>,
update_handler: Arc<UpdateHandler>,
}
impl Deref for Index {
@ -43,14 +88,28 @@ impl Deref for Index {
}
impl Index {
pub fn open(path: impl AsRef<Path>, size: usize, update_file_store: Arc<UpdateFileStore>) -> Result<Self> {
pub fn open(path: impl AsRef<Path>, size: usize, update_file_store: Arc<UpdateFileStore>, uuid: Uuid, update_handler: Arc<UpdateHandler>) -> Result<Self> {
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let inner = Arc::new(milli::Index::new(options, &path)?);
Ok(Index { inner, update_file_store })
Ok(Index { inner, update_file_store, uuid, update_handler })
}
pub fn stats(&self) -> Result<IndexStats> {
let rtxn = self.read_txn()?;
Ok(IndexStats {
size: self.size(),
number_of_documents: self.number_of_documents(&rtxn)?,
is_indexing: None,
field_distribution: self.field_distribution(&rtxn)?,
})
}
pub fn meta(&self) -> Result<IndexMeta> {
IndexMeta::new(self)
}
pub fn settings(&self) -> Result<Settings<Checked>> {
let txn = self.read_txn()?;
self.settings_txn(&txn)

View File

@ -52,7 +52,7 @@ impl UpdateHandler {
pub fn handle_update(
&self,
index: Index,
index: &Index,
meta: Processing,
) -> Result<Processed, Failed> {
let update_id = meta.id();

View File

@ -8,10 +8,10 @@ use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder};
use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid;
use crate::index_controller::updates::status::UpdateResult;
use crate::index_controller::updates::status::{Failed, Processed, Processing, UpdateResult};
use super::Index;
use super::error::Result;
use super::{Index, IndexMeta};
use super::error::{IndexError, Result};
fn serialize_with_wildcard<S>(
field: &Setting<Vec<String>>,
@ -163,6 +163,31 @@ pub struct Facets {
}
impl Index {
pub fn handle_update(&self, update: Processing) -> std::result::Result<Processed, Failed> {
self.update_handler.handle_update(self, update)
}
pub fn update_primary_key(&self, primary_key: Option<String>) -> Result<IndexMeta> {
match primary_key {
Some(primary_key) => {
let mut txn = self.write_txn()?;
if self.primary_key(&txn)?.is_some() {
return Err(IndexError::ExistingPrimaryKey);
}
let mut builder = UpdateBuilder::new(0).settings(&mut txn, self);
builder.set_primary_key(primary_key);
builder.execute(|_, _| ())?;
let meta = IndexMeta::new_txn(self, &txn)?;
txn.commit()?;
Ok(meta)
}
None => {
let meta = IndexMeta::new(self)?;
Ok(meta)
}
}
}
pub fn update_documents(
&self,
method: IndexDocumentsMethod,

View File

@ -10,14 +10,14 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use super::error::{DumpActorError, Result};
use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask};
use crate::index_controller::uuid_resolver::UuidResolverSender;
use crate::index_controller::index_resolver::HardStateIndexResolver;
use crate::index_controller::updates::UpdateSender;
pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor {
inbox: Option<mpsc::Receiver<DumpMsg>>,
uuid_resolver: UuidResolverSender,
index_resolver: Arc<HardStateIndexResolver>,
update: UpdateSender,
dump_path: PathBuf,
lock: Arc<Mutex<()>>,
@ -34,7 +34,7 @@ fn generate_uid() -> String {
impl DumpActor {
pub fn new(
inbox: mpsc::Receiver<DumpMsg>,
uuid_resolver: UuidResolverSender,
index_resolver: Arc<HardStateIndexResolver>,
update: UpdateSender,
dump_path: impl AsRef<Path>,
index_db_size: usize,
@ -44,7 +44,7 @@ impl DumpActor {
let lock = Arc::new(Mutex::new(()));
Self {
inbox: Some(inbox),
uuid_resolver,
index_resolver,
update,
dump_path: dump_path.as_ref().into(),
dump_infos,
@ -113,7 +113,7 @@ impl DumpActor {
let task = DumpTask {
path: self.dump_path.clone(),
uuid_resolver: self.uuid_resolver.clone(),
index_resolver: self.index_resolver.clone(),
update_handle: self.update.clone(),
uid: uid.clone(),
update_db_size: self.update_db_size,

View File

@ -1,7 +1,7 @@
use meilisearch_error::{Code, ErrorCode};
use crate::index_controller::updates::error::UpdateActorError;
use crate::index_controller::uuid_resolver::error::UuidResolverError;
use crate::index_controller::index_resolver::error::IndexResolverError;
use crate::index_controller::updates::error::UpdateLoopError;
pub type Result<T> = std::result::Result<T, DumpActorError>;
@ -14,9 +14,9 @@ pub enum DumpActorError {
#[error("Internal error: {0}")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("{0}")]
UuidResolver(#[from] UuidResolverError),
IndexResolver(#[from] IndexResolverError),
#[error("{0}")]
UpdateActor(#[from] UpdateActorError),
UpdateLoop(#[from] UpdateLoopError),
}
macro_rules! internal_error {
@ -45,8 +45,8 @@ impl ErrorCode for DumpActorError {
DumpActorError::DumpAlreadyRunning => Code::DumpAlreadyInProgress,
DumpActorError::DumpDoesNotExist(_) => Code::NotFound,
DumpActorError::Internal(_) => Code::Internal,
DumpActorError::UuidResolver(e) => e.error_code(),
DumpActorError::UpdateActor(e) => e.error_code(),
DumpActorError::IndexResolver(e) => e.error_code(),
DumpActorError::UpdateLoop(e) => e.error_code(),
}
}
}

View File

@ -1,8 +1,9 @@
use std::path::Path;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use crate::index_controller::uuid_resolver::UuidResolverSender;
use crate::index_controller::index_resolver::HardStateIndexResolver;
use super::error::Result;
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg};
@ -32,7 +33,7 @@ impl DumpActorHandle for DumpActorHandleImpl {
impl DumpActorHandleImpl {
pub fn new(
path: impl AsRef<Path>,
uuid_resolver: UuidResolverSender,
index_resolver: Arc<HardStateIndexResolver>,
update: crate::index_controller::updates::UpdateSender,
index_db_size: usize,
update_db_size: usize,
@ -40,7 +41,7 @@ impl DumpActorHandleImpl {
let (sender, receiver) = mpsc::channel(10);
let actor = DumpActor::new(
receiver,
uuid_resolver,
index_resolver,
update,
path,
index_db_size,

View File

@ -7,7 +7,7 @@ use milli::update::Setting;
use serde::{Deserialize, Deserializer, Serialize};
use uuid::Uuid;
use crate::index_controller::uuid_resolver::store::HeedUuidStore;
use crate::index_controller::index_resolver::uuid_store::HeedUuidStore;
use crate::index_controller::{self, IndexMetadata};
use crate::index_controller::{asc_ranking_rule, desc_ranking_rule};
use crate::{

View File

@ -5,8 +5,8 @@ use log::info;
use serde::{Deserialize, Serialize};
use crate::index::Index;
use crate::index_controller::index_resolver::uuid_store::HeedUuidStore;
use crate::index_controller::updates::store::UpdateStore;
use crate::index_controller::{uuid_resolver::store::HeedUuidStore};
use crate::options::IndexerOpts;
#[derive(Serialize, Deserialize, Debug)]

View File

@ -1,5 +1,6 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Context;
use chrono::{DateTime, Utc};
@ -16,11 +17,10 @@ pub use actor::DumpActor;
pub use handle_impl::*;
pub use message::DumpMsg;
use super::index_resolver::HardStateIndexResolver;
use super::updates::UpdateSender;
use super::uuid_resolver::UuidResolverSender;
use crate::index_controller::dump_actor::error::DumpActorError;
use crate::index_controller::updates::UpdateMsg;
use crate::index_controller::uuid_resolver::UuidResolverMsg;
use crate::options::IndexerOpts;
use error::Result;
@ -154,7 +154,7 @@ pub fn load_dump(
struct DumpTask {
path: PathBuf,
uuid_resolver: UuidResolverSender,
index_resolver: Arc<HardStateIndexResolver>,
update_handle: UpdateSender,
uid: String,
update_db_size: usize,
@ -177,9 +177,9 @@ impl DumpTask {
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
let uuids = UuidResolverMsg::dump(&self.uuid_resolver, temp_dump_path.clone()).await?;
let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?;
UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?;
UpdateMsg::dump(&self.update_handle, uuids.into_iter().collect(), temp_dump_path.clone()).await?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;

View File

@ -2,13 +2,13 @@ use std::error::Error;
use meilisearch_error::Code;
use meilisearch_error::ErrorCode;
use tokio::task::JoinError;
use crate::index::error::IndexError;
use super::dump_actor::error::DumpActorError;
use super::indexes::error::IndexActorError;
use super::updates::error::UpdateActorError;
use super::uuid_resolver::error::UuidResolverError;
use super::index_resolver::error::IndexResolverError;
use super::updates::error::UpdateLoopError;
pub type Result<T> = std::result::Result<T, IndexControllerError>;
@ -17,11 +17,9 @@ pub enum IndexControllerError {
#[error("Index creation must have an uid")]
MissingUid,
#[error("{0}")]
Uuid(#[from] UuidResolverError),
IndexResolver(#[from] IndexResolverError),
#[error("{0}")]
IndexActor(#[from] IndexActorError),
#[error("{0}")]
UpdateActor(#[from] UpdateActorError),
UpdateLoop(#[from] UpdateLoopError),
#[error("{0}")]
DumpActor(#[from] DumpActorError),
#[error("{0}")]
@ -30,13 +28,14 @@ pub enum IndexControllerError {
Internal(Box<dyn Error + Send + Sync + 'static>),
}
internal_error!(IndexControllerError: JoinError);
impl ErrorCode for IndexControllerError {
fn error_code(&self) -> Code {
match self {
IndexControllerError::MissingUid => Code::BadRequest,
IndexControllerError::Uuid(e) => e.error_code(),
IndexControllerError::IndexActor(e) => e.error_code(),
IndexControllerError::UpdateActor(e) => e.error_code(),
IndexControllerError::IndexResolver(e) => e.error_code(),
IndexControllerError::UpdateLoop(e) => e.error_code(),
IndexControllerError::DumpActor(e) => e.error_code(),
IndexControllerError::IndexError(e) => e.error_code(),
IndexControllerError::Internal(_) => Code::Internal,

View File

@ -0,0 +1,63 @@
use std::fmt;
use meilisearch_error::{Code, ErrorCode};
use tokio::sync::mpsc::error::SendError as MpscSendError;
use tokio::sync::oneshot::error::RecvError as OneshotRecvError;
use crate::{error::MilliError, index::error::IndexError};
pub type Result<T> = std::result::Result<T, IndexResolverError>;
#[derive(thiserror::Error, Debug)]
pub enum IndexResolverError {
#[error("{0}")]
IndexError(#[from] IndexError),
#[error("Index already exists")]
IndexAlreadyExists,
#[error("Index {0} not found")]
UnexistingIndex(String),
#[error("A primary key is already present. It's impossible to update it")]
ExistingPrimaryKey,
#[error("Internal Error: {0}")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("{0}")]
Milli(#[from] milli::Error),
#[error("Index must have a valid uid; Index uid can be of type integer or string only composed of alphanumeric characters, hyphens (-) and underscores (_).")]
BadlyFormatted(String),
}
impl<T> From<MpscSendError<T>> for IndexResolverError
where T: Send + Sync + 'static + fmt::Debug
{
fn from(other: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::Internal(Box::new(other))
}
}
impl From<OneshotRecvError> for IndexResolverError {
fn from(other: tokio::sync::oneshot::error::RecvError) -> Self {
Self::Internal(Box::new(other))
}
}
internal_error!(
IndexResolverError: heed::Error,
uuid::Error,
std::io::Error,
tokio::task::JoinError,
serde_json::Error
);
impl ErrorCode for IndexResolverError {
fn error_code(&self) -> Code {
match self {
IndexResolverError::IndexError(e) => e.error_code(),
IndexResolverError::IndexAlreadyExists => Code::IndexAlreadyExists,
IndexResolverError::UnexistingIndex(_) => Code::IndexNotFound,
IndexResolverError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent,
IndexResolverError::Internal(_) => Code::Internal,
IndexResolverError::Milli(e) => MilliError(e).error_code(),
IndexResolverError::BadlyFormatted(_) => Code::InvalidIndexUid,
}
}
}

View File

@ -0,0 +1,116 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use milli::update::UpdateBuilder;
use tokio::fs;
use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::error::{IndexResolverError, Result};
use crate::index::Index;
use crate::index::update_handler::UpdateHandler;
use crate::index_controller::update_file_store::UpdateFileStore;
use crate::options::IndexerOpts;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
#[async_trait::async_trait]
pub trait IndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
}
pub struct MapIndexStore {
index_store: AsyncMap<Uuid, Index>,
path: PathBuf,
index_size: usize,
update_file_store: Arc<UpdateFileStore>,
update_handler: Arc<UpdateHandler>,
}
impl MapIndexStore {
pub fn new(path: impl AsRef<Path>, index_size: usize, indexer_opts: &IndexerOpts) -> anyhow::Result<Self> {
let update_handler = Arc::new(UpdateHandler::new(indexer_opts)?);
let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap());
let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new()));
Ok(Self {
index_store,
path,
index_size,
update_file_store,
update_handler,
})
}
}
#[async_trait::async_trait]
impl IndexStore for MapIndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
// We need to keep the lock until we are sure the db file has been opened correclty, to
// ensure that another db is not created at the same time.
let mut lock = self.index_store.write().await;
if let Some(index) = lock.get(&uuid) {
return Ok(index.clone());
}
let path = self.path.join(format!("index-{}", uuid));
if path.exists() {
return Err(IndexResolverError::IndexAlreadyExists);
}
let index_size = self.index_size;
let file_store = self.update_file_store.clone();
let update_handler = self.update_handler.clone();
let index = spawn_blocking(move || -> Result<Index> {
let index = Index::open(path, index_size, file_store, uuid, update_handler)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index);
builder.set_primary_key(primary_key);
builder.execute(|_, _| ())?;
txn.commit()?;
}
Ok(index)
})
.await??;
lock.insert(uuid, index.clone());
Ok(index)
}
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
let guard = self.index_store.read().await;
match guard.get(&uuid) {
Some(index) => Ok(Some(index.clone())),
None => {
// drop the guard here so we can perform the write after without deadlocking;
drop(guard);
let path = self.path.join(format!("index-{}", uuid));
if !path.exists() {
return Ok(None);
}
let index_size = self.index_size;
let file_store = self.update_file_store.clone();
let update_handler = self.update_handler.clone();
let index = spawn_blocking(move || Index::open(path, index_size, file_store, uuid, update_handler)).await??;
self.index_store.write().await.insert(uuid, index.clone());
Ok(Some(index))
}
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid));
fs::remove_dir_all(db_path).await?;
let index = self.index_store.write().await.remove(&uuid);
Ok(index)
}
}

View File

@ -0,0 +1,37 @@
use std::{collections::HashSet, path::PathBuf};
use tokio::sync::oneshot;
use uuid::Uuid;
use crate::index::Index;
use super::error::Result;
pub enum IndexResolverMsg {
Get {
uid: String,
ret: oneshot::Sender<Result<Index>>,
},
Delete {
uid: String,
ret: oneshot::Sender<Result<Index>>,
},
List {
ret: oneshot::Sender<Result<Vec<(String, Index)>>>,
},
Insert {
uuid: Uuid,
name: String,
ret: oneshot::Sender<Result<()>>,
},
SnapshotRequest {
path: PathBuf,
ret: oneshot::Sender<Result<HashSet<Index>>>,
},
GetSize {
ret: oneshot::Sender<Result<u64>>,
},
DumpRequest {
path: PathBuf,
ret: oneshot::Sender<Result<HashSet<Index>>>,
},
}

View File

@ -0,0 +1,117 @@
pub mod uuid_store;
mod index_store;
//mod message;
pub mod error;
use std::path::Path;
use uuid::Uuid;
use uuid_store::{UuidStore, HeedUuidStore};
use index_store::{IndexStore, MapIndexStore};
use error::{Result, IndexResolverError};
use crate::{index::Index, options::IndexerOpts};
pub type HardStateIndexResolver = IndexResolver<HeedUuidStore, MapIndexStore>;
pub fn create_index_resolver(path: impl AsRef<Path>, index_size: usize, indexer_opts: &IndexerOpts) -> anyhow::Result<HardStateIndexResolver> {
let uuid_store = HeedUuidStore::new(&path)?;
let index_store = MapIndexStore::new(&path, index_size, indexer_opts)?;
Ok(IndexResolver::new(uuid_store, index_store))
}
pub struct IndexResolver<U, I> {
index_uuid_store: U,
index_store: I,
}
impl<U, I> IndexResolver<U ,I>
where U: UuidStore,
I: IndexStore,
{
pub fn new(
index_uuid_store: U,
index_store: I,
) -> Self {
Self {
index_uuid_store,
index_store,
}
}
pub async fn dump(&self, _path: impl AsRef<Path>) -> Result<Vec<Uuid>> {
todo!()
}
pub async fn get_size(&self) -> Result<u64> {
todo!()
}
pub async fn perform_snapshot(&self, _path: impl AsRef<Path>) -> Result<()> {
todo!()
}
pub async fn create_index(&self, uid: String, primary_key: Option<String>) -> Result<(Uuid, Index)> {
let uuid = Uuid::new_v4();
let index = self.index_store.create(uuid, primary_key).await?;
self.index_uuid_store.insert(uid, uuid).await?;
Ok((uuid, index))
}
pub async fn list(&self) -> Result<Vec<(String, Index)>> {
let uuids = self.index_uuid_store.list().await?;
let mut indexes = Vec::new();
for (name, uuid) in uuids {
match self.index_store.get(uuid).await? {
Some(index) => {
indexes.push((name, index))
},
None => {
// we found an unexisting index, we remove it from the uuid store
let _ = self.index_uuid_store.delete(name).await;
},
}
}
Ok(indexes)
}
pub async fn delete_index(&self, uid: String) -> Result<()> {
match self.index_uuid_store.delete(uid.clone()).await? {
Some(uuid) => {
let _ = self.index_store.delete(uuid).await;
Ok(())
}
None => Err(IndexResolverError::UnexistingIndex(uid)),
}
}
pub async fn get_index_by_uuid(&self, uuid: Uuid) -> Result<Index> {
// TODO: Handle this error better.
self.index_store.get(uuid).await?.ok_or(IndexResolverError::UnexistingIndex(String::new()))
}
pub async fn get_index(&self, uid: String) -> Result<Index> {
match self.index_uuid_store.get_uuid(uid).await? {
(name, Some(uuid)) => {
match self.index_store.get(uuid).await? {
Some(index) => Ok(index),
None => {
// For some reason we got a uuid to an unexisting index, we return an error,
// and remove the uuid from th uuid store.
let _ = self.index_uuid_store.delete(name.clone()).await;
Err(IndexResolverError::UnexistingIndex(name))
},
}
}
(name, _) => Err(IndexResolverError::UnexistingIndex(name))
}
}
pub async fn get_uuid(&self, uid: String) -> Result<Uuid> {
match self.index_uuid_store.get_uuid(uid).await? {
(_, Some(uuid)) => Ok(uuid),
(name, _) => Err(IndexResolverError::UnexistingIndex(name))
}
}
}

View File

@ -0,0 +1,226 @@
use std::collections::HashSet;
use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use heed::types::{ByteSlice, Str};
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::error::{Result, IndexResolverError};
use crate::EnvSizer;
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB
#[derive(Serialize, Deserialize)]
struct DumpEntry {
uuid: Uuid,
uid: String,
}
const UUIDS_DB_PATH: &str = "index_uuids";
#[async_trait::async_trait]
pub trait UuidStore: Sized {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise.
async fn get_uuid(&self, uid: String) -> Result<(String, Option<Uuid>)>;
async fn delete(&self, uid: String) -> Result<Option<Uuid>>;
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
}
#[derive(Clone)]
pub struct HeedUuidStore {
env: Env,
db: Database<Str, ByteSlice>,
}
impl HeedUuidStore {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().join(UUIDS_DB_PATH);
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(UUID_STORE_SIZE); // 1GB
let env = options.open(path)?;
let db = env.create_database(None)?;
Ok(Self { env, db })
}
pub fn get_uuid(&self, name: &str) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?;
match db.get(&txn, name)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
Ok(Some(uuid))
}
None => Ok(None),
}
}
pub fn delete(&self, uid: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
match db.get(&txn, &uid)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
db.delete(&mut txn, &uid)?;
txn.commit()?;
Ok(Some(uuid))
}
None => Ok(None),
}
}
pub fn list(&self) -> Result<Vec<(String, Uuid)>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (name, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push((name.to_owned(), uuid))
}
Ok(entries)
}
pub fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
if db.get(&txn, &name)?.is_some() {
return Err(IndexResolverError::IndexAlreadyExists);
}
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(())
}
pub fn snapshot(&self, mut path: PathBuf) -> Result<HashSet<Uuid>> {
let env = self.env.clone();
let db = self.db;
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = HashSet::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.insert(uuid);
}
// only perform snapshot if there are indexes
if !entries.is_empty() {
path.push(UUIDS_DB_PATH);
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
}
pub fn get_size(&self) -> Result<u64> {
Ok(self.env.size())
}
pub fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let dump_path = path.join(UUIDS_DB_PATH);
create_dir_all(&dump_path)?;
let dump_file_path = dump_path.join("data.jsonl");
let mut dump_file = File::create(&dump_file_path)?;
let mut uuids = HashSet::new();
let txn = self.env.read_txn()?;
for entry in self.db.iter(&txn)? {
let (uid, uuid) = entry?;
let uid = uid.to_string();
let uuid = Uuid::from_slice(uuid)?;
let entry = DumpEntry { uuid, uid };
serde_json::to_writer(&mut dump_file, &entry)?;
dump_file.write_all(b"\n").unwrap();
uuids.insert(uuid);
}
Ok(uuids)
}
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<()> {
let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH);
std::fs::create_dir_all(&uuid_resolver_path)?;
let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl");
let indexes = File::open(&src_indexes)?;
let mut indexes = BufReader::new(indexes);
let mut line = String::new();
let db = Self::new(dst)?;
let mut txn = db.env.write_txn()?;
loop {
match indexes.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let DumpEntry { uuid, uid } = serde_json::from_str(&line)?;
println!("importing {} {}", uid, uuid);
db.db.put(&mut txn, &uid, uuid.as_bytes())?;
}
Err(e) => return Err(e.into()),
}
line.clear();
}
txn.commit()?;
db.env.prepare_for_closing().wait();
Ok(())
}
}
#[async_trait::async_trait]
impl UuidStore for HeedUuidStore {
async fn get_uuid(&self, name: String) -> Result<(String, Option<Uuid>)> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.get_uuid(&name).map(|res| (name, res))).await?
}
async fn delete(&self, uid: String) -> Result<Option<Uuid>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.delete(uid)).await?
}
async fn list(&self) -> Result<Vec<(String, Uuid)>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.list()).await?
}
async fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.insert(name, uuid)).await?
}
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.snapshot(path)).await?
}
async fn get_size(&self) -> Result<u64> {
self.get_size()
}
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.dump(path)).await?
}
}

View File

@ -42,7 +42,7 @@ pub fn create_indexes_handler(
indexer_options: &IndexerOpts,
) -> anyhow::Result<IndexHandlerSender> {
let (sender, receiver) = mpsc::channel(100);
let store = MapIndexStore::new(&db_path, index_size);
let store = MapIndexStore::new(&db_path, index_size, indexer_options);
let actor = IndexActor::new(receiver, store, indexer_options)?;
tokio::task::spawn(actor.run());
@ -59,7 +59,7 @@ pub struct IndexMeta {
}
impl IndexMeta {
fn new(index: &Index) -> Result<Self> {
pub fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
@ -223,7 +223,7 @@ where
None => self.store.create(uuid, None).await?,
};
Ok(spawn_blocking(move || update_handler.handle_update(index, meta)).await?)
Ok(spawn_blocking(move || update_handler.handle_update(&index, meta)).await?)
}
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {

View File

@ -10,6 +10,7 @@ use uuid::Uuid;
use super::error::{IndexActorError, Result};
use crate::index::Index;
use crate::index::update_handler::UpdateHandler;
use crate::index_controller::update_file_store::UpdateFileStore;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
@ -26,10 +27,11 @@ pub struct MapIndexStore {
path: PathBuf,
index_size: usize,
update_file_store: Arc<UpdateFileStore>,
update_handler: Arc<UpdateHandler>,
}
impl MapIndexStore {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> Self {
pub fn new(path: impl AsRef<Path>, index_size: usize, update_handler: Arc<UpdateHandler>) -> Self {
let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap());
let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new()));
@ -38,6 +40,7 @@ impl MapIndexStore {
path,
index_size,
update_file_store,
update_handler,
}
}
}
@ -59,8 +62,9 @@ impl IndexStore for MapIndexStore {
let index_size = self.index_size;
let file_store = self.update_file_store.clone();
let update_handler = self.update_handler.clone();
let index = spawn_blocking(move || -> Result<Index> {
let index = Index::open(path, index_size, file_store)?;
let index = Index::open(path, index_size, file_store, uuid, update_handler)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;

View File

@ -9,38 +9,52 @@ use chrono::{DateTime, Utc};
use futures::Stream;
use log::info;
use milli::update::IndexDocumentsMethod;
use milli::FieldDistribution;
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use uuid::Uuid;
use dump_actor::DumpActorHandle;
pub use dump_actor::{DumpInfo, DumpStatus};
use snapshot::load_snapshot;
use uuid_resolver::error::UuidResolverError;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::index::{Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings};
use crate::index_controller::index_resolver::create_index_resolver;
use crate::options::IndexerOpts;
use error::Result;
use crate::index::error::Result as IndexResult;
use self::dump_actor::load_dump;
use self::indexes::IndexMsg;
use self::index_resolver::HardStateIndexResolver;
use self::index_resolver::error::IndexResolverError;
use self::updates::status::UpdateStatus;
use self::updates::UpdateMsg;
use self::uuid_resolver::UuidResolverMsg;
mod dump_actor;
pub mod error;
pub mod indexes;
//pub mod indexes;
mod snapshot;
pub mod update_file_store;
pub mod updates;
mod uuid_resolver;
//mod uuid_resolver;
mod index_resolver;
pub type Payload = Box<
dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin,
>;
macro_rules! time {
($e:expr) => {
{
let now = std::time::Instant::now();
let result = $e;
let elapsed = now.elapsed();
println!("elapsed at line {}: {}ms ({}ns)", line!(), elapsed.as_millis(), elapsed.as_nanos());
result
}
};
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMetadata {
@ -49,7 +63,7 @@ pub struct IndexMetadata {
pub uid: String,
name: String,
#[serde(flatten)]
pub meta: indexes::IndexMeta,
pub meta: IndexMeta,
}
#[derive(Clone, Debug)]
@ -58,23 +72,9 @@ pub struct IndexSettings {
pub primary_key: Option<String>,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct IndexStats {
#[serde(skip)]
pub size: u64,
pub number_of_documents: u64,
/// Whether the current index is performing an update. It is initially `None` when the
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
pub is_indexing: Option<bool>,
pub field_distribution: FieldDistribution,
}
#[derive(Clone)]
pub struct IndexController {
uuid_resolver: uuid_resolver::UuidResolverSender,
index_handle: indexes::IndexHandlerSender,
index_resolver: Arc<HardStateIndexResolver>,
update_handle: updates::UpdateSender,
dump_handle: dump_actor::DumpActorHandleImpl,
}
@ -149,17 +149,15 @@ impl IndexControllerBuilder {
std::fs::create_dir_all(db_path.as_ref())?;
let uuid_resolver = uuid_resolver::create_uuid_resolver(&db_path)?;
let index_handle = indexes::create_indexes_handler(&db_path, index_size, &indexer_options)?;
let index_resolver = Arc::new(create_index_resolver(&db_path, index_size, &indexer_options)?);
#[allow(unreachable_code)]
let update_handle = updates::create_update_handler(index_handle.clone(), &db_path, update_store_size)?;
let update_handle = updates::create_update_handler(index_resolver.clone(), &db_path, update_store_size)?;
let dump_path = self.dump_dst.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?;
let dump_handle = dump_actor::DumpActorHandleImpl::new(
&self
.dump_dst
.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?,
uuid_resolver.clone(),
dump_path,
index_resolver.clone(),
update_handle.clone(),
index_size,
update_store_size,
@ -182,8 +180,7 @@ impl IndexControllerBuilder {
//}
Ok(IndexController {
uuid_resolver,
index_handle,
index_resolver,
update_handle,
dump_handle,
})
@ -246,18 +243,15 @@ impl IndexController {
}
pub async fn register_update(&self, uid: &str, update: Update) -> Result<UpdateStatus> {
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.to_string()).await;
match uuid {
match self.index_resolver.get_uuid(uid.to_string()).await {
Ok(uuid) => {
let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?;
Ok(update_result)
}
Err(UuidResolverError::UnexistingIndex(name)) => {
let uuid = Uuid::new_v4();
Err(IndexResolverError::UnexistingIndex(name)) => {
let (uuid, _) = self.index_resolver.create_index(name, None).await?;
let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?;
// ignore if index creation fails now, since it may already have been created
let _ = IndexMsg::create_index(&self.index_handle, uuid, None).await?;
UuidResolverMsg::insert(&self.uuid_resolver, uuid, name).await?;
Ok(update_result)
}
@ -391,24 +385,24 @@ impl IndexController {
//}
pub async fn update_status(&self, uid: String, id: u64) -> Result<UpdateStatus> {
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
let uuid = self.index_resolver.get_uuid(uid).await?;
let result = UpdateMsg::get_update(&self.update_handle, uuid, id).await?;
Ok(result)
}
pub async fn all_update_status(&self, uid: String) -> Result<Vec<UpdateStatus>> {
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
let uuid = self.index_resolver.get_uuid(uid).await?;
let result = UpdateMsg::list_updates(&self.update_handle, uuid).await?;
Ok(result)
}
pub async fn list_indexes(&self) -> Result<Vec<IndexMetadata>> {
let uuids = UuidResolverMsg::list(&self.uuid_resolver).await?;
let indexes = self.index_resolver.list().await?;
let mut ret = Vec::new();
for (uid, uuid) in uuids {
let meta = IndexMsg::index_meta(&self.index_handle, uuid).await?;
for (uid, index) in indexes {
let meta = index.meta()?;
let meta = IndexMetadata {
uuid,
uuid: index.uuid,
name: uid.clone(),
uid,
meta,
@ -420,8 +414,8 @@ impl IndexController {
}
pub async fn settings(&self, uid: String) -> Result<Settings<Checked>> {
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
let settings = IndexMsg::settings(&self.index_handle, uuid).await?;
let index = self.index_resolver.get_index(uid).await?;
let settings = spawn_blocking(move || index.settings()).await??;
Ok(settings)
}
@ -432,15 +426,8 @@ impl IndexController {
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
let documents = IndexMsg::documents(
&self.index_handle,
uuid,
offset,
limit,
attributes_to_retrieve,
)
.await?;
let index = self.index_resolver.get_index(uid).await?;
let documents = spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)).await??;
Ok(documents)
}
@ -450,8 +437,8 @@ impl IndexController {
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
let document = IndexMsg::document(&self.index_handle, uuid, attributes_to_retrieve, doc_id).await?;
let index = self.index_resolver.get_index(uid).await?;
let document = spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)).await??;
Ok(document)
}
@ -460,12 +447,12 @@ impl IndexController {
uid: String,
mut index_settings: IndexSettings,
) -> Result<IndexMetadata> {
if index_settings.uid.is_some() {
index_settings.uid.take();
}
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?;
let meta = IndexMsg::update_index(&self.index_handle, uuid, index_settings).await?;
index_settings.uid.take();
let index = self.index_resolver.get_index(uid.clone()).await?;
let uuid = index.uuid;
let meta = spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
@ -476,14 +463,15 @@ impl IndexController {
}
pub async fn search(&self, uid: String, query: SearchQuery) -> Result<SearchResult> {
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
let result = IndexMsg::search(&self.index_handle, uuid, query).await?;
let index = time!(self.index_resolver.get_index(uid.clone()).await?);
let result = time!(spawn_blocking(move || time!(index.perform_search(query))).await??);
Ok(result)
}
pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> {
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?;
let meta = IndexMsg::index_meta(&self.index_handle, uuid).await?;
let index = self.index_resolver.get_index(uid.clone()).await?;
let uuid = index.uuid;
let meta = spawn_blocking(move || index.meta()).await??;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
@ -494,15 +482,16 @@ impl IndexController {
}
pub async fn get_uuids_size(&self) -> Result<u64> {
let size = UuidResolverMsg::get_size(&self.uuid_resolver).await?;
let size = self.index_resolver.get_size().await?;
Ok(size)
}
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?;
let update_infos = UpdateMsg::get_info(&self.update_handle).await?;
let mut stats = IndexMsg::index_stats(&self.index_handle, uuid).await?;
// Check if the currently indexing update is from out index.
let index = self.index_resolver.get_index(uid).await?;
let uuid = index.uuid;
let mut stats = spawn_blocking(move || index.stats()).await??;
// Check if the currently indexing update is from our index.
stats.is_indexing = Some(Some(uuid) == update_infos.processing);
Ok(stats)
}
@ -513,17 +502,24 @@ impl IndexController {
let mut last_update: Option<DateTime<_>> = None;
let mut indexes = BTreeMap::new();
for index in self.list_indexes().await? {
let mut index_stats = IndexMsg::index_stats(&self.index_handle, index.uuid).await?;
database_size += index_stats.size;
for (index_uid, index) in self.index_resolver.list().await? {
let uuid = index.uuid;
let (mut stats, meta) = spawn_blocking::<_, IndexResult<_>>(move || {
let stats = index.stats()?;
let meta = index.meta()?;
Ok((stats, meta))
}).await??;
last_update = last_update.map_or(Some(index.meta.updated_at), |last| {
Some(last.max(index.meta.updated_at))
database_size += stats.size;
last_update = last_update.map_or(Some(meta.updated_at), |last| {
Some(last.max(meta.updated_at))
});
index_stats.is_indexing = Some(Some(index.uuid) == update_infos.processing);
// Check if the currently indexing update is from our index.
stats.is_indexing = Some(Some(uuid) == update_infos.processing);
indexes.insert(index.uid, index_stats);
indexes.insert(index_uid, stats);
}
Ok(Stats {

View File

@ -3,19 +3,17 @@ use std::error::Error;
use meilisearch_error::{Code, ErrorCode};
use crate::index_controller::indexes::error::IndexActorError;
pub type Result<T> = std::result::Result<T, UpdateActorError>;
pub type Result<T> = std::result::Result<T, UpdateLoopError>;
#[derive(Debug, thiserror::Error)]
#[allow(clippy::large_enum_variant)]
pub enum UpdateActorError {
pub enum UpdateLoopError {
#[error("Update {0} not found.")]
UnexistingUpdate(u64),
#[error("Internal error: {0}")]
Internal(Box<dyn Error + Send + Sync + 'static>),
#[error("{0}")]
IndexActor(#[from] IndexActorError),
//#[error("{0}")]
//IndexActor(#[from] IndexActorError),
#[error(
"update store was shut down due to a fatal error, please check your logs for more info."
)]
@ -26,7 +24,7 @@ pub enum UpdateActorError {
PayloadError(#[from] actix_web::error::PayloadError),
}
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateActorError
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError
where T: Sync + Send + 'static + fmt::Debug
{
fn from(other: tokio::sync::mpsc::error::SendError<T>) -> Self {
@ -34,28 +32,28 @@ where T: Sync + Send + 'static + fmt::Debug
}
}
impl From<tokio::sync::oneshot::error::RecvError> for UpdateActorError {
impl From<tokio::sync::oneshot::error::RecvError> for UpdateLoopError {
fn from(other: tokio::sync::oneshot::error::RecvError) -> Self {
Self::Internal(Box::new(other))
}
}
internal_error!(
UpdateActorError: heed::Error,
UpdateLoopError: heed::Error,
std::io::Error,
serde_json::Error,
tokio::task::JoinError
);
impl ErrorCode for UpdateActorError {
impl ErrorCode for UpdateLoopError {
fn error_code(&self) -> Code {
match self {
UpdateActorError::UnexistingUpdate(_) => Code::NotFound,
UpdateActorError::Internal(_) => Code::Internal,
UpdateActorError::IndexActor(e) => e.error_code(),
UpdateActorError::FatalUpdateStoreError => Code::Internal,
UpdateActorError::InvalidPayload(_) => Code::BadRequest,
UpdateActorError::PayloadError(error) => match error {
UpdateLoopError::UnexistingUpdate(_) => Code::NotFound,
UpdateLoopError::Internal(_) => Code::Internal,
//UpdateLoopError::IndexActor(e) => e.error_code(),
UpdateLoopError::FatalUpdateStoreError => Code::Internal,
UpdateLoopError::InvalidPayload(_) => Code::BadRequest,
UpdateLoopError::PayloadError(error) => match error {
actix_web::error::PayloadError::Overflow => Code::PayloadTooLarge,
_ => Code::Internal,
},

View File

@ -21,25 +21,25 @@ use serde_json::{Map, Value};
use tokio::sync::mpsc;
use uuid::Uuid;
use self::error::{Result, UpdateActorError};
use self::error::{Result, UpdateLoopError};
pub use self::message::UpdateMsg;
use self::store::{UpdateStore, UpdateStoreInfo};
use crate::index_controller::update_file_store::UpdateFileStore;
use status::UpdateStatus;
use super::indexes::IndexHandlerSender;
use super::index_resolver::HardStateIndexResolver;
use super::{DocumentAdditionFormat, Payload, Update};
pub type UpdateSender = mpsc::Sender<UpdateMsg>;
pub fn create_update_handler(
index_sender: IndexHandlerSender,
index_resolver: Arc<HardStateIndexResolver>,
db_path: impl AsRef<Path>,
update_store_size: usize,
) -> anyhow::Result<UpdateSender> {
let path = db_path.as_ref().to_owned();
let (sender, receiver) = mpsc::channel(100);
let actor = UpdateLoop::new(update_store_size, receiver, path, index_sender)?;
let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?;
tokio::task::spawn_local(actor.run());
@ -100,7 +100,7 @@ pub struct UpdateLoop {
store: Arc<UpdateStore>,
inbox: Option<mpsc::Receiver<UpdateMsg>>,
update_file_store: UpdateFileStore,
index_handle: IndexHandlerSender,
index_resolver: Arc<HardStateIndexResolver>,
must_exit: Arc<AtomicBool>,
}
@ -109,7 +109,7 @@ impl UpdateLoop {
update_db_size: usize,
inbox: mpsc::Receiver<UpdateMsg>,
path: impl AsRef<Path>,
index_handle: IndexHandlerSender,
index_resolver: Arc<HardStateIndexResolver>,
) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned();
std::fs::create_dir_all(&path)?;
@ -119,7 +119,7 @@ impl UpdateLoop {
let must_exit = Arc::new(AtomicBool::new(false));
let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?;
let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone())?;
let inbox = Some(inbox);
@ -128,9 +128,9 @@ impl UpdateLoop {
Ok(Self {
store,
inbox,
index_handle,
must_exit,
update_file_store,
index_resolver,
})
}
@ -249,7 +249,7 @@ impl UpdateLoop {
tokio::task::spawn_blocking(move || {
let result = store
.meta(uuid, id)?
.ok_or(UpdateActorError::UnexistingUpdate(id))?;
.ok_or(UpdateLoopError::UnexistingUpdate(id))?;
Ok(result)
})
.await?
@ -263,18 +263,19 @@ impl UpdateLoop {
Ok(())
}
async fn handle_snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
let update_store = self.store.clone();
async fn handle_snapshot(&self, _uuids: HashSet<Uuid>,_pathh: PathBuf) -> Result<()> {
todo!()
//let index_handle = self.index_resolver.clone();
//let update_store = self.store.clone();
tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
.await??;
//tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
//.await??;
Ok(())
//Ok(())
}
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
let index_handle = self.index_resolver.clone();
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || -> Result<()> {

View File

@ -1,16 +1,11 @@
use std::{
collections::HashSet,
fs::{create_dir_all, File},
io::Write,
path::{Path, PathBuf},
};
use std::{collections::HashSet, fs::{create_dir_all, File}, io::Write, path::{Path, PathBuf}, sync::Arc};
use heed::RoTxn;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{Result, State, UpdateStore};
use crate::index_controller::{indexes::{IndexHandlerSender, IndexMsg}, updates::{status::UpdateStatus}};
use crate::index_controller::{index_resolver::HardStateIndexResolver, updates::status::UpdateStatus};
#[derive(Serialize, Deserialize)]
struct UpdateEntry {
@ -23,7 +18,7 @@ impl UpdateStore {
&self,
uuids: &HashSet<Uuid>,
path: PathBuf,
handle: IndexHandlerSender,
handle: Arc<HardStateIndexResolver>,
) -> Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Dumping);
@ -171,13 +166,14 @@ impl UpdateStore {
}
async fn dump_indexes(
uuids: &HashSet<Uuid>,
handle: IndexHandlerSender,
path: impl AsRef<Path>,
_uuids: &HashSet<Uuid>,
_handle: Arc<HardStateIndexResolver>,
_path: impl AsRef<Path>,
) -> Result<()> {
for uuid in uuids {
IndexMsg::dump(&handle, *uuid, path.as_ref().to_owned()).await?;
}
todo!()
//for uuid in uuids {
//IndexMsg::dump(&handle, *uuid, path.as_ref().to_owned()).await?;
//}
Ok(())
//Ok(())
}

View File

@ -12,7 +12,6 @@ use std::{
};
use arc_swap::ArcSwap;
use futures::StreamExt;
use heed::types::{ByteSlice, OwnedType, SerdeJson};
use heed::zerocopy::U64;
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
@ -30,7 +29,6 @@ use super::RegisterUpdate;
use super::error::Result;
use super::status::{Enqueued, Processing};
use crate::EnvSizer;
use crate::index_controller::indexes::{CONCURRENT_INDEX_MSG, IndexHandlerSender, IndexMsg};
use crate::index_controller::update_files_path;
use crate::index_controller::updates::*;
@ -148,7 +146,7 @@ impl UpdateStore {
pub fn open(
options: EnvOpenOptions,
path: impl AsRef<Path>,
index_handle: IndexHandlerSender,
index_resolver: Arc<HardStateIndexResolver>,
must_exit: Arc<AtomicBool>,
) -> anyhow::Result<Arc<Self>> {
let (update_store, mut notification_receiver) = Self::new(options, path)?;
@ -173,7 +171,7 @@ impl UpdateStore {
loop {
match update_store_weak.upgrade() {
Some(update_store) => {
let handler = index_handle.clone();
let handler = index_resolver.clone();
let res = tokio::task::spawn_blocking(move || {
update_store.process_pending_update(handler)
})
@ -286,7 +284,7 @@ impl UpdateStore {
/// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update(&self, index_handle: IndexHandlerSender) -> Result<Option<()>> {
fn process_pending_update(&self, index_resolver: Arc<HardStateIndexResolver>) -> Result<Option<()>> {
// Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?;
let first_meta = self.pending_queue.first(&rtxn)?;
@ -303,7 +301,7 @@ impl UpdateStore {
state.swap(State::Processing(index_uuid, processing.clone()));
let result =
self.perform_update(processing, index_handle, index_uuid, global_id);
self.perform_update(processing, index_resolver, index_uuid, global_id);
state.swap(State::Idle);
@ -316,18 +314,18 @@ impl UpdateStore {
fn perform_update(
&self,
processing: Processing,
index_handle: IndexHandlerSender,
index_resolver: Arc<HardStateIndexResolver>,
index_uuid: Uuid,
global_id: u64,
) -> Result<Option<()>> {
// Process the pending update using the provided user function.
let handle = Handle::current();
let update_id = processing.id();
let result =
match handle.block_on(IndexMsg::update(&index_handle, index_uuid, processing.clone())) {
Ok(result) => result,
Err(e) => Err(processing.fail(e)),
};
//IndexMsg::update(index_resolver, index_uuid, processing.clone()
let result = match handle.block_on(index_resolver.get_index_by_uuid(index_uuid)) {
Ok(index) => index.handle_update(processing),
Err(e) => Err(processing.fail(e)),
};
// Once the pending update have been successfully processed
// we must remove the content from the pending and processing stores and
@ -484,9 +482,9 @@ impl UpdateStore {
pub fn snapshot(
&self,
uuids: &HashSet<Uuid>,
_uuids: &HashSet<Uuid>,
path: impl AsRef<Path>,
handle: IndexHandlerSender,
handle: Arc<HardStateIndexResolver>,
) -> Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Snapshoting);
@ -522,22 +520,23 @@ impl UpdateStore {
//}
}
let path = &path.as_ref().to_path_buf();
let handle = &handle;
let _path = &path.as_ref().to_path_buf();
let _handle = &handle;
// Perform the snapshot of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor
let mut stream = futures::stream::iter(uuids.iter())
.map(move |uuid| IndexMsg::snapshot(handle,*uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
todo!()
//let mut stream = futures::stream::iter(uuids.iter())
//.map(move |uuid| IndexMsg::snapshot(handle,*uuid, path.clone()))
//.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
Handle::current().block_on(async {
while let Some(res) = stream.next().await {
res?;
}
Ok(()) as Result<()>
})?;
//Handle::current().block_on(async {
//while let Some(res) = stream.next().await {
//res?;
//}
//Ok(()) as Result<()>
//})?;
Ok(())
//Ok(())
}
pub fn get_info(&self) -> Result<UpdateStoreInfo> {