get rids of meilisearch-lib

This commit is contained in:
Tamo 2022-09-27 16:33:37 +02:00 committed by Clément Renault
parent 2de8f08517
commit 2d31cff082
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
40 changed files with 398 additions and 2330 deletions

View file

@ -1,8 +0,0 @@
use std::{fs, path::Path};
/// Copy the `instance-uid` contained in one db to another. Ignore all errors.
pub fn copy_user_id(src: &Path, dst: &Path) {
if let Ok(user_id) = fs::read_to_string(src.join("instance-uid")) {
let _ = fs::write(dst.join("instance-uid"), &user_id);
}
}

View file

@ -1,26 +0,0 @@
use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use tar::{Archive, Builder};
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let mut f = File::create(dest)?;
let gz_encoder = GzEncoder::new(&mut f, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", src)?;
let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?;
f.flush()?;
Ok(())
}
pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let f = File::open(&src)?;
let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz);
create_dir_all(&dest)?;
ar.unpack(&dest)?;
Ok(())
}

View file

@ -1,17 +0,0 @@
pub mod v2;
pub mod v3;
pub mod v4;
/// Parses the v1 version of the Asc ranking rules `asc(price)`and returns the field name.
pub fn asc_ranking_rule(text: &str) -> Option<&str> {
text.split_once("asc(")
.and_then(|(_, tail)| tail.rsplit_once(')'))
.map(|(field, _)| field)
}
/// Parses the v1 version of the Desc ranking rules `desc(price)`and returns the field name.
pub fn desc_ranking_rule(text: &str) -> Option<&str> {
text.split_once("desc(")
.and_then(|(_, tail)| tail.rsplit_once(')'))
.map(|(field, _)| field)
}

View file

@ -1,205 +0,0 @@
use meilisearch_types::error::{Code, ResponseError};
use meilisearch_types::index_uid::IndexUid;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use super::v4::{Task, TaskContent, TaskEvent};
use crate::index::{Settings, Unchecked};
use crate::tasks::task::{DocumentDeletion, TaskId, TaskResult};
use super::v2;
#[derive(Serialize, Deserialize)]
pub struct DumpEntry {
pub uuid: Uuid,
pub uid: String,
}
#[derive(Serialize, Deserialize)]
pub struct UpdateEntry {
pub uuid: Uuid,
pub update: UpdateStatus,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "camelCase")]
pub enum UpdateStatus {
Processing(Processing),
Enqueued(Enqueued),
Processed(Processed),
Failed(Failed),
}
impl From<v2::UpdateResult> for TaskResult {
fn from(other: v2::UpdateResult) -> Self {
match other {
v2::UpdateResult::DocumentsAddition(result) => TaskResult::DocumentAddition {
indexed_documents: result.nb_documents as u64,
},
v2::UpdateResult::DocumentDeletion { deleted } => TaskResult::DocumentDeletion {
deleted_documents: deleted,
},
v2::UpdateResult::Other => TaskResult::Other,
}
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Update {
DeleteDocuments(Vec<String>),
DocumentAddition {
primary_key: Option<String>,
method: IndexDocumentsMethod,
content_uuid: Uuid,
},
Settings(Settings<Unchecked>),
ClearDocuments,
}
impl From<Update> for super::v4::TaskContent {
fn from(update: Update) -> Self {
match update {
Update::DeleteDocuments(ids) => {
TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids))
}
Update::DocumentAddition {
primary_key,
method,
..
} => TaskContent::DocumentAddition {
content_uuid: Uuid::default(),
merge_strategy: method,
primary_key,
// document count is unknown for legacy updates
documents_count: 0,
allow_index_creation: true,
},
Update::Settings(settings) => TaskContent::SettingsUpdate {
settings,
// There is no way to know now, so we assume it isn't
is_deletion: false,
allow_index_creation: true,
},
Update::ClearDocuments => TaskContent::DocumentDeletion(DocumentDeletion::Clear),
}
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum UpdateMeta {
DocumentsAddition {
method: IndexDocumentsMethod,
primary_key: Option<String>,
},
ClearDocuments,
DeleteDocuments {
ids: Vec<String>,
},
Settings(Settings<Unchecked>),
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Enqueued {
pub update_id: u64,
pub meta: Update,
#[serde(with = "time::serde::rfc3339")]
pub enqueued_at: OffsetDateTime,
}
impl Enqueued {
fn update_task(self, task: &mut Task) {
// we do not erase the `TaskId` that was given to us.
task.content = self.meta.into();
task.events.push(TaskEvent::Created(self.enqueued_at));
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Processed {
pub success: v2::UpdateResult,
#[serde(with = "time::serde::rfc3339")]
pub processed_at: OffsetDateTime,
#[serde(flatten)]
pub from: Processing,
}
impl Processed {
fn update_task(self, task: &mut Task) {
self.from.update_task(task);
let event = TaskEvent::Succeded {
result: TaskResult::from(self.success),
timestamp: self.processed_at,
};
task.events.push(event);
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Processing {
#[serde(flatten)]
pub from: Enqueued,
#[serde(with = "time::serde::rfc3339")]
pub started_processing_at: OffsetDateTime,
}
impl Processing {
fn update_task(self, task: &mut Task) {
self.from.update_task(task);
let event = TaskEvent::Processing(self.started_processing_at);
task.events.push(event);
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Failed {
#[serde(flatten)]
pub from: Processing,
pub msg: String,
pub code: Code,
#[serde(with = "time::serde::rfc3339")]
pub failed_at: OffsetDateTime,
}
impl Failed {
fn update_task(self, task: &mut Task) {
self.from.update_task(task);
let event = TaskEvent::Failed {
error: ResponseError::from_msg(self.msg, self.code),
timestamp: self.failed_at,
};
task.events.push(event);
}
}
impl From<(UpdateStatus, String, TaskId)> for Task {
fn from((update, uid, task_id): (UpdateStatus, String, TaskId)) -> Self {
// Dummy task
let mut task = super::v4::Task {
id: task_id,
index_uid: IndexUid::new_unchecked(uid),
content: super::v4::TaskContent::IndexDeletion,
events: Vec::new(),
};
match update {
UpdateStatus::Processing(u) => u.update_task(&mut task),
UpdateStatus::Enqueued(u) => u.update_task(&mut task),
UpdateStatus::Processed(u) => u.update_task(&mut task),
UpdateStatus::Failed(u) => u.update_task(&mut task),
}
task
}
}

View file

@ -1,145 +0,0 @@
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::index::{Settings, Unchecked};
use crate::tasks::batch::BatchId;
use crate::tasks::task::{
DocumentDeletion, TaskContent as NewTaskContent, TaskEvent as NewTaskEvent, TaskId, TaskResult,
};
#[derive(Debug, Serialize, Deserialize)]
pub struct Task {
pub id: TaskId,
pub index_uid: IndexUid,
pub content: TaskContent,
pub events: Vec<TaskEvent>,
}
impl From<Task> for crate::tasks::task::Task {
fn from(other: Task) -> Self {
Self {
id: other.id,
content: NewTaskContent::from((other.index_uid, other.content)),
events: other.events.into_iter().map(Into::into).collect(),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum TaskEvent {
Created(#[serde(with = "time::serde::rfc3339")] OffsetDateTime),
Batched {
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
batch_id: BatchId,
},
Processing(#[serde(with = "time::serde::rfc3339")] OffsetDateTime),
Succeded {
result: TaskResult,
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
},
Failed {
error: ResponseError,
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
},
}
impl From<TaskEvent> for NewTaskEvent {
fn from(other: TaskEvent) -> Self {
match other {
TaskEvent::Created(x) => NewTaskEvent::Created(x),
TaskEvent::Batched {
timestamp,
batch_id,
} => NewTaskEvent::Batched {
timestamp,
batch_id,
},
TaskEvent::Processing(x) => NewTaskEvent::Processing(x),
TaskEvent::Succeded { result, timestamp } => {
NewTaskEvent::Succeeded { result, timestamp }
}
TaskEvent::Failed { error, timestamp } => NewTaskEvent::Failed { error, timestamp },
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum TaskContent {
DocumentAddition {
content_uuid: Uuid,
merge_strategy: IndexDocumentsMethod,
primary_key: Option<String>,
documents_count: usize,
allow_index_creation: bool,
},
DocumentDeletion(DocumentDeletion),
SettingsUpdate {
settings: Settings<Unchecked>,
/// Indicates whether the task was a deletion
is_deletion: bool,
allow_index_creation: bool,
},
IndexDeletion,
IndexCreation {
primary_key: Option<String>,
},
IndexUpdate {
primary_key: Option<String>,
},
Dump {
uid: String,
},
}
impl From<(IndexUid, TaskContent)> for NewTaskContent {
fn from((index_uid, content): (IndexUid, TaskContent)) -> Self {
match content {
TaskContent::DocumentAddition {
content_uuid,
merge_strategy,
primary_key,
documents_count,
allow_index_creation,
} => NewTaskContent::DocumentAddition {
index_uid,
content_uuid,
merge_strategy,
primary_key,
documents_count,
allow_index_creation,
},
TaskContent::DocumentDeletion(deletion) => NewTaskContent::DocumentDeletion {
index_uid,
deletion,
},
TaskContent::SettingsUpdate {
settings,
is_deletion,
allow_index_creation,
} => NewTaskContent::SettingsUpdate {
index_uid,
settings,
is_deletion,
allow_index_creation,
},
TaskContent::IndexDeletion => NewTaskContent::IndexDeletion { index_uid },
TaskContent::IndexCreation { primary_key } => NewTaskContent::IndexCreation {
index_uid,
primary_key,
},
TaskContent::IndexUpdate { primary_key } => NewTaskContent::IndexUpdate {
index_uid,
primary_key,
},
TaskContent::Dump { uid } => NewTaskContent::Dump { uid },
}
}
}

View file

@ -1,42 +0,0 @@
use meilisearch_auth::error::AuthControllerError;
use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::internal_error;
use crate::{index_resolver::error::IndexResolverError, tasks::error::TaskError};
pub type Result<T> = std::result::Result<T, DumpError>;
#[derive(thiserror::Error, Debug)]
pub enum DumpError {
#[error("An internal error has occurred. `{0}`.")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("{0}")]
IndexResolver(Box<IndexResolverError>),
}
internal_error!(
DumpError: milli::heed::Error,
std::io::Error,
tokio::task::JoinError,
tokio::sync::oneshot::error::RecvError,
serde_json::error::Error,
tempfile::PersistError,
fs_extra::error::Error,
AuthControllerError,
TaskError
);
impl From<IndexResolverError> for DumpError {
fn from(e: IndexResolverError) -> Self {
Self::IndexResolver(Box::new(e))
}
}
impl ErrorCode for DumpError {
fn error_code(&self) -> Code {
match self {
DumpError::Internal(_) => Code::Internal,
DumpError::IndexResolver(e) => e.error_code(),
}
}
}

View file

@ -1,188 +0,0 @@
#[cfg(not(test))]
pub use real::DumpHandler;
#[cfg(test)]
pub use test::MockDumpHandler as DumpHandler;
use time::{macros::format_description, OffsetDateTime};
/// Generate uid from creation date
pub fn generate_uid() -> String {
OffsetDateTime::now_utc()
.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
))
.unwrap()
}
mod real {
use std::path::PathBuf;
use std::sync::Arc;
use log::{info, trace};
use meilisearch_auth::AuthController;
use milli::heed::Env;
use tokio::fs::create_dir_all;
use tokio::io::AsyncWriteExt;
use crate::analytics;
use crate::compression::to_tar_gz;
use crate::dump::error::{DumpError, Result};
use crate::dump::{MetadataVersion, META_FILE_NAME};
use crate::index_resolver::{
index_store::IndexStore, meta_store::IndexMetaStore, IndexResolver,
};
use crate::tasks::TaskStore;
use crate::update_file_store::UpdateFileStore;
pub struct DumpHandler<U, I> {
dump_path: PathBuf,
db_path: PathBuf,
update_file_store: UpdateFileStore,
task_store_size: usize,
index_db_size: usize,
env: Arc<Env>,
index_resolver: Arc<IndexResolver<U, I>>,
}
impl<U, I> DumpHandler<U, I>
where
U: IndexMetaStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
pub fn new(
dump_path: PathBuf,
db_path: PathBuf,
update_file_store: UpdateFileStore,
task_store_size: usize,
index_db_size: usize,
env: Arc<Env>,
index_resolver: Arc<IndexResolver<U, I>>,
) -> Self {
Self {
dump_path,
db_path,
update_file_store,
task_store_size,
index_db_size,
env,
index_resolver,
}
}
pub async fn run(&self, uid: String) -> Result<()> {
trace!("Performing dump.");
create_dir_all(&self.dump_path).await?;
let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let meta = MetadataVersion::new_v5(self.index_db_size, self.task_store_size);
let meta_path = temp_dump_path.join(META_FILE_NAME);
let meta_bytes = serde_json::to_vec(&meta)?;
let mut meta_file = tokio::fs::File::create(&meta_path).await?;
meta_file.write_all(&meta_bytes).await?;
analytics::copy_user_id(&self.db_path, &temp_dump_path);
create_dir_all(&temp_dump_path.join("indexes")).await?;
let db_path = self.db_path.clone();
let temp_dump_path_clone = temp_dump_path.clone();
tokio::task::spawn_blocking(move || -> Result<()> {
AuthController::dump(db_path, temp_dump_path_clone)?;
Ok(())
})
.await??;
TaskStore::dump(
self.env.clone(),
&temp_dump_path,
self.update_file_store.clone(),
)
.await?;
self.index_resolver.dump(&temp_dump_path).await?;
let dump_path = self.dump_path.clone();
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
// for now we simply copy the updates/updates_files
// FIXME: We may copy more files than necessary, if new files are added while we are
// performing the dump. We need a way to filter them out.
let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?;
to_tar_gz(temp_dump_path, temp_dump_file.path())
.map_err(|e| DumpError::Internal(e.into()))?;
let dump_path = dump_path.join(uid).with_extension("dump");
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
}
}
#[cfg(test)]
mod test {
use std::path::PathBuf;
use std::sync::Arc;
use milli::heed::Env;
use nelson::Mocker;
use crate::dump::error::Result;
use crate::index_resolver::IndexResolver;
use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore};
use crate::update_file_store::UpdateFileStore;
use super::*;
pub enum MockDumpHandler<U, I> {
Real(super::real::DumpHandler<U, I>),
Mock(Mocker),
}
impl<U, I> MockDumpHandler<U, I> {
pub fn mock(mocker: Mocker) -> Self {
Self::Mock(mocker)
}
}
impl<U, I> MockDumpHandler<U, I>
where
U: IndexMetaStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
pub fn new(
dump_path: PathBuf,
db_path: PathBuf,
update_file_store: UpdateFileStore,
task_store_size: usize,
index_db_size: usize,
env: Arc<Env>,
index_resolver: Arc<IndexResolver<U, I>>,
) -> Self {
Self::Real(super::real::DumpHandler::new(
dump_path,
db_path,
update_file_store,
task_store_size,
index_db_size,
env,
index_resolver,
))
}
pub async fn run(&self, uid: String) -> Result<()> {
match self {
DumpHandler::Real(real) => real.run(uid).await,
DumpHandler::Mock(mocker) => unsafe { mocker.get("run").call(uid) },
}
}
}
}

View file

@ -1,4 +0,0 @@
pub mod v2;
pub mod v3;
pub mod v4;
pub mod v5;

View file

@ -1,216 +0,0 @@
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use serde_json::{Deserializer, Value};
use tempfile::NamedTempFile;
use crate::dump::compat::{self, v2, v3};
use crate::dump::Metadata;
use crate::options::IndexerOpts;
/// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a
/// dump v3, then calls the dump v3 to actually handle the dump.
pub fn load_dump(
meta: Metadata,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
log::info!("Patching dump V2 to dump V3...");
let indexes_path = src.as_ref().join("indexes");
let dir_entries = std::fs::read_dir(indexes_path)?;
for entry in dir_entries {
let entry = entry?;
// rename the index folder
let path = entry.path();
let new_path = patch_index_uuid_path(&path).expect("invalid index folder.");
std::fs::rename(path, &new_path)?;
let settings_path = new_path.join("meta.json");
patch_settings(settings_path)?;
}
let update_dir = src.as_ref().join("updates");
let update_path = update_dir.join("data.jsonl");
patch_updates(update_dir, update_path)?;
super::v3::load_dump(
meta,
src,
dst,
index_db_size,
update_db_size,
indexing_options,
)
}
fn patch_index_uuid_path(path: &Path) -> Option<PathBuf> {
let uuid = path.file_name()?.to_str()?.trim_start_matches("index-");
let new_path = path.parent()?.join(uuid);
Some(new_path)
}
fn patch_settings(path: impl AsRef<Path>) -> anyhow::Result<()> {
let mut meta_file = File::open(&path)?;
let mut meta: Value = serde_json::from_reader(&mut meta_file)?;
// We first deserialize the dump meta into a serde_json::Value and change
// the custom ranking rules settings from the old format to the new format.
if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") {
patch_custom_ranking_rules(ranking_rules);
}
let mut meta_file = OpenOptions::new().truncate(true).write(true).open(path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
Ok(())
}
fn patch_updates(dir: impl AsRef<Path>, path: impl AsRef<Path>) -> anyhow::Result<()> {
let mut output_update_file = NamedTempFile::new_in(&dir)?;
let update_file = File::open(&path)?;
let stream = Deserializer::from_reader(update_file).into_iter::<v2::UpdateEntry>();
for update in stream {
let update_entry = update?;
let update_entry = v3::UpdateEntry::from(update_entry);
serde_json::to_writer(&mut output_update_file, &update_entry)?;
output_update_file.write_all(b"\n")?;
}
output_update_file.flush()?;
output_update_file.persist(path)?;
Ok(())
}
/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`.
///
/// This is done for compatibility reasons, and to avoid a new dump version,
/// since the new syntax was introduced soon after the new dump version.
fn patch_custom_ranking_rules(ranking_rules: &mut Value) {
*ranking_rules = match ranking_rules.take() {
Value::Array(values) => values
.into_iter()
.filter_map(|value| match value {
Value::String(s) if s.starts_with("asc") => compat::asc_ranking_rule(&s)
.map(|f| format!("{}:asc", f))
.map(Value::String),
Value::String(s) if s.starts_with("desc") => compat::desc_ranking_rule(&s)
.map(|f| format!("{}:desc", f))
.map(Value::String),
otherwise => Some(otherwise),
})
.collect(),
otherwise => otherwise,
}
}
impl From<v2::UpdateEntry> for v3::UpdateEntry {
fn from(v2::UpdateEntry { uuid, update }: v2::UpdateEntry) -> Self {
let update = match update {
v2::UpdateStatus::Processing(meta) => v3::UpdateStatus::Processing(meta.into()),
v2::UpdateStatus::Enqueued(meta) => v3::UpdateStatus::Enqueued(meta.into()),
v2::UpdateStatus::Processed(meta) => v3::UpdateStatus::Processed(meta.into()),
v2::UpdateStatus::Aborted(_) => unreachable!("Updates could never be aborted."),
v2::UpdateStatus::Failed(meta) => v3::UpdateStatus::Failed(meta.into()),
};
Self { uuid, update }
}
}
impl From<v2::Failed> for v3::Failed {
fn from(other: v2::Failed) -> Self {
let v2::Failed {
from,
error,
failed_at,
} = other;
Self {
from: from.into(),
msg: error.message,
code: v2::error_code_from_str(&error.error_code)
.expect("Invalid update: Invalid error code"),
failed_at,
}
}
}
impl From<v2::Processing> for v3::Processing {
fn from(other: v2::Processing) -> Self {
let v2::Processing {
from,
started_processing_at,
} = other;
Self {
from: from.into(),
started_processing_at,
}
}
}
impl From<v2::Enqueued> for v3::Enqueued {
fn from(other: v2::Enqueued) -> Self {
let v2::Enqueued {
update_id,
meta,
enqueued_at,
content,
} = other;
let meta = match meta {
v2::UpdateMeta::DocumentsAddition {
method,
primary_key,
..
} => {
v3::Update::DocumentAddition {
primary_key,
method,
// Just ignore if the uuid is no present. If it is needed later, an error will
// be thrown.
content_uuid: content.unwrap_or_default(),
}
}
v2::UpdateMeta::ClearDocuments => v3::Update::ClearDocuments,
v2::UpdateMeta::DeleteDocuments { ids } => v3::Update::DeleteDocuments(ids),
v2::UpdateMeta::Settings(settings) => v3::Update::Settings(settings),
};
Self {
update_id,
meta,
enqueued_at,
}
}
}
impl From<v2::Processed> for v3::Processed {
fn from(other: v2::Processed) -> Self {
let v2::Processed {
from,
success,
processed_at,
} = other;
Self {
success,
processed_at,
from: from.into(),
}
}
}

View file

@ -1,136 +0,0 @@
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{BufReader, BufWriter, Write};
use std::path::Path;
use anyhow::Context;
use fs_extra::dir::{self, CopyOptions};
use log::info;
use tempfile::tempdir;
use uuid::Uuid;
use crate::dump::compat::{self, v3};
use crate::dump::Metadata;
use crate::index_resolver::meta_store::{DumpEntry, IndexMeta};
use crate::options::IndexerOpts;
use crate::tasks::task::TaskId;
/// dump structure for V3:
/// .
/// ├── indexes
/// │   └── 25f10bb8-6ea8-42f0-bd48-ad5857f77648
/// │   ├── documents.jsonl
/// │   └── meta.json
/// ├── index_uuids
/// │   └── data.jsonl
/// ├── metadata.json
/// └── updates
/// └── data.jsonl
pub fn load_dump(
meta: Metadata,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
meta_env_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
info!("Patching dump V3 to dump V4...");
let patched_dir = tempdir()?;
let options = CopyOptions::default();
dir::copy(src.as_ref().join("indexes"), patched_dir.path(), &options)?;
dir::copy(
src.as_ref().join("index_uuids"),
patched_dir.path(),
&options,
)?;
let uuid_map = patch_index_meta(
src.as_ref().join("index_uuids/data.jsonl"),
patched_dir.path(),
)?;
fs::copy(
src.as_ref().join("metadata.json"),
patched_dir.path().join("metadata.json"),
)?;
patch_updates(&src, patched_dir.path(), uuid_map)?;
super::v4::load_dump(
meta,
patched_dir.path(),
dst,
index_db_size,
meta_env_size,
indexing_options,
)
}
fn patch_index_meta(
path: impl AsRef<Path>,
dst: impl AsRef<Path>,
) -> anyhow::Result<HashMap<Uuid, String>> {
let file = BufReader::new(File::open(path)?);
let dst = dst.as_ref().join("index_uuids");
fs::create_dir_all(&dst)?;
let mut dst_file = File::create(dst.join("data.jsonl"))?;
let map = serde_json::Deserializer::from_reader(file)
.into_iter::<v3::DumpEntry>()
.try_fold(HashMap::new(), |mut map, entry| -> anyhow::Result<_> {
let entry = entry?;
map.insert(entry.uuid, entry.uid.clone());
let meta = IndexMeta {
uuid: entry.uuid,
// This is lost information, we patch it to 0;
creation_task_id: 0,
};
let entry = DumpEntry {
uid: entry.uid,
index_meta: meta,
};
serde_json::to_writer(&mut dst_file, &entry)?;
dst_file.write_all(b"\n")?;
Ok(map)
})?;
dst_file.flush()?;
Ok(map)
}
fn patch_updates(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
uuid_map: HashMap<Uuid, String>,
) -> anyhow::Result<()> {
let dst = dst.as_ref().join("updates");
fs::create_dir_all(&dst)?;
let mut dst_file = BufWriter::new(File::create(dst.join("data.jsonl"))?);
let src_file = BufReader::new(File::open(src.as_ref().join("updates/data.jsonl"))?);
serde_json::Deserializer::from_reader(src_file)
.into_iter::<v3::UpdateEntry>()
.enumerate()
.try_for_each(|(task_id, entry)| -> anyhow::Result<()> {
let entry = entry?;
let name = uuid_map
.get(&entry.uuid)
.with_context(|| format!("Unknown index uuid: {}", entry.uuid))?
.clone();
serde_json::to_writer(
&mut dst_file,
&compat::v4::Task::from((entry.update, name, task_id as TaskId)),
)?;
dst_file.write_all(b"\n")?;
Ok(())
})?;
dst_file.flush()?;
Ok(())
}

View file

@ -1,47 +0,0 @@
use std::{path::Path, sync::Arc};
use log::info;
use meilisearch_auth::AuthController;
use milli::heed::EnvOpenOptions;
use crate::analytics;
use crate::dump::Metadata;
use crate::index_resolver::IndexResolver;
use crate::options::IndexerOpts;
use crate::tasks::TaskStore;
use crate::update_file_store::UpdateFileStore;
pub fn load_dump(
meta: Metadata,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
meta_env_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
info!(
"Loading dump from {}, dump database version: {}, dump version: V5",
meta.dump_date, meta.db_version
);
let mut options = EnvOpenOptions::new();
options.map_size(meta_env_size);
options.max_dbs(100);
let env = Arc::new(options.open(&dst)?);
IndexResolver::load_dump(
src.as_ref(),
&dst,
index_db_size,
env.clone(),
indexing_options,
)?;
UpdateFileStore::load_dump(src.as_ref(), &dst)?;
TaskStore::load_dump(&src, env)?;
AuthController::load_dump(&src, &dst)?;
analytics::copy_user_id(src.as_ref(), dst.as_ref());
info!("Loading indexes.");
Ok(())
}

View file

@ -1,262 +0,0 @@
use std::fs::File;
use std::path::Path;
use anyhow::bail;
use log::info;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tempfile::TempDir;
use crate::compression::from_tar_gz;
use crate::options::IndexerOpts;
use self::loaders::{v2, v3, v4, v5};
pub use handler::{generate_uid, DumpHandler};
mod compat;
pub mod error;
mod handler;
mod loaders;
const META_FILE_NAME: &str = "metadata.json";
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Metadata {
db_version: String,
index_db_size: usize,
update_db_size: usize,
#[serde(with = "time::serde::rfc3339")]
dump_date: OffsetDateTime,
}
impl Metadata {
pub fn new(index_db_size: usize, update_db_size: usize) -> Self {
Self {
db_version: env!("CARGO_PKG_VERSION").to_string(),
index_db_size,
update_db_size,
dump_date: OffsetDateTime::now_utc(),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV1 {
pub db_version: String,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "dumpVersion")]
pub enum MetadataVersion {
V1(MetadataV1),
V2(Metadata),
V3(Metadata),
V4(Metadata),
// V5 is forward compatible with V4 but not backward compatible.
V5(Metadata),
}
impl MetadataVersion {
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
meta_env_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
match self {
MetadataVersion::V1(_meta) => {
anyhow::bail!("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.")
}
MetadataVersion::V2(meta) => v2::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
MetadataVersion::V3(meta) => v3::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
MetadataVersion::V4(meta) => v4::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
MetadataVersion::V5(meta) => v5::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
}
Ok(())
}
pub fn new_v5(index_db_size: usize, update_db_size: usize) -> Self {
let meta = Metadata::new(index_db_size, update_db_size);
Self::V5(meta)
}
pub fn db_version(&self) -> &str {
match self {
Self::V1(meta) => &meta.db_version,
Self::V2(meta) | Self::V3(meta) | Self::V4(meta) | Self::V5(meta) => &meta.db_version,
}
}
pub fn version(&self) -> &'static str {
match self {
MetadataVersion::V1(_) => "V1",
MetadataVersion::V2(_) => "V2",
MetadataVersion::V3(_) => "V3",
MetadataVersion::V4(_) => "V4",
MetadataVersion::V5(_) => "V5",
}
}
pub fn dump_date(&self) -> Option<&OffsetDateTime> {
match self {
MetadataVersion::V1(_) => None,
MetadataVersion::V2(meta)
| MetadataVersion::V3(meta)
| MetadataVersion::V4(meta)
| MetadataVersion::V5(meta) => Some(&meta.dump_date),
}
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum DumpStatus {
Done,
InProgress,
Failed,
}
pub fn load_dump(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
ignore_dump_if_db_exists: bool,
ignore_missing_dump: bool,
index_db_size: usize,
update_db_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
let empty_db = crate::is_empty_db(&dst_path);
let src_path_exists = src_path.as_ref().exists();
if empty_db && src_path_exists {
let (tmp_src, tmp_dst, meta) = extract_dump(&dst_path, &src_path)?;
meta.load_dump(
tmp_src.path(),
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?;
persist_dump(&dst_path, tmp_dst)?;
Ok(())
} else if !empty_db && !ignore_dump_if_db_exists {
bail!(
"database already exists at {:?}, try to delete it or rename it",
dst_path
.as_ref()
.canonicalize()
.unwrap_or_else(|_| dst_path.as_ref().to_owned())
)
} else if !src_path_exists && !ignore_missing_dump {
bail!("dump doesn't exist at {:?}", src_path.as_ref())
} else {
// there is nothing to do
Ok(())
}
}
fn extract_dump(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
) -> anyhow::Result<(TempDir, TempDir, MetadataVersion)> {
// Setup a temp directory path in the same path as the database, to prevent cross devices
// references.
let temp_path = dst_path
.as_ref()
.parent()
.map(ToOwned::to_owned)
.unwrap_or_else(|| ".".into());
let tmp_src = tempfile::tempdir_in(temp_path)?;
let tmp_src_path = tmp_src.path();
from_tar_gz(&src_path, tmp_src_path)?;
let meta_path = tmp_src_path.join(META_FILE_NAME);
let mut meta_file = File::open(&meta_path)?;
let meta: MetadataVersion = serde_json::from_reader(&mut meta_file)?;
if !dst_path.as_ref().exists() {
std::fs::create_dir_all(dst_path.as_ref())?;
}
let tmp_dst = tempfile::tempdir_in(dst_path.as_ref())?;
info!(
"Loading dump {}, dump database version: {}, dump version: {}",
meta.dump_date()
.map(|t| format!("from {}", t))
.unwrap_or_else(String::new),
meta.db_version(),
meta.version()
);
Ok((tmp_src, tmp_dst, meta))
}
fn persist_dump(dst_path: impl AsRef<Path>, tmp_dst: TempDir) -> anyhow::Result<()> {
let persisted_dump = tmp_dst.into_path();
// Delete everything in the `data.ms` except the tempdir.
if dst_path.as_ref().exists() {
for file in dst_path.as_ref().read_dir().unwrap() {
let file = file.unwrap().path();
if file.file_name() == persisted_dump.file_name() {
continue;
}
if file.is_file() {
std::fs::remove_file(&file)?;
} else {
std::fs::remove_dir_all(&file)?;
}
}
}
// Move the whole content of the tempdir into the `data.ms`.
for file in persisted_dump.read_dir().unwrap() {
let file = file.unwrap().path();
std::fs::rename(&file, &dst_path.as_ref().join(file.file_name().unwrap()))?;
}
// Delete the empty tempdir.
std::fs::remove_dir_all(&persisted_dump)?;
Ok(())
}

View file

@ -1,55 +0,0 @@
use std::error::Error;
use std::fmt;
use meilisearch_types::error::{Code, ErrorCode};
use milli::UserError;
#[derive(Debug)]
pub struct MilliError<'a>(pub &'a milli::Error);
impl Error for MilliError<'_> {}
impl fmt::Display for MilliError<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl ErrorCode for MilliError<'_> {
fn error_code(&self) -> Code {
match self.0 {
milli::Error::InternalError(_) => Code::Internal,
milli::Error::IoError(_) => Code::Internal,
milli::Error::UserError(ref error) => {
match error {
// TODO: wait for spec for new error codes.
UserError::SerdeJson(_)
| UserError::InvalidLmdbOpenOptions
| UserError::DocumentLimitReached
| UserError::AccessingSoftDeletedDocument { .. }
| UserError::UnknownInternalDocumentId { .. } => Code::Internal,
UserError::InvalidStoreFile => Code::InvalidStore,
UserError::NoSpaceLeftOnDevice => Code::NoSpaceLeftOnDevice,
UserError::MaxDatabaseSizeReached => Code::DatabaseSizeLimitReached,
UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded,
UserError::InvalidFilter(_) => Code::Filter,
UserError::MissingDocumentId { .. } => Code::MissingDocumentId,
UserError::InvalidDocumentId { .. } | UserError::TooManyDocumentIds { .. } => {
Code::InvalidDocumentId
}
UserError::MissingPrimaryKey => Code::MissingPrimaryKey,
UserError::PrimaryKeyCannotBeChanged(_) => Code::PrimaryKeyAlreadyPresent,
UserError::SortRankingRuleMissing => Code::Sort,
UserError::InvalidFacetsDistribution { .. } => Code::BadRequest,
UserError::InvalidSortableAttribute { .. } => Code::Sort,
UserError::CriterionError(_) => Code::InvalidRankingRule,
UserError::InvalidGeoField { .. } => Code::InvalidGeoField,
UserError::SortError(_) => Code::Sort,
UserError::InvalidMinTypoWordLenSetting(_, _) => {
Code::InvalidMinWordLengthForTypo
}
}
}
}
}
}

View file

@ -1,66 +0,0 @@
use std::error::Error;
use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::index_uid::IndexUidFormatError;
use meilisearch_types::internal_error;
use tokio::task::JoinError;
use super::DocumentAdditionFormat;
use crate::document_formats::DocumentFormatError;
// use crate::dump::error::DumpError;
use index::error::IndexError;
pub type Result<T> = std::result::Result<T, IndexControllerError>;
#[derive(Debug, thiserror::Error)]
pub enum IndexControllerError {
#[error("Index creation must have an uid")]
MissingUid,
#[error(transparent)]
IndexResolver(#[from] index_scheduler::Error),
#[error(transparent)]
IndexError(#[from] IndexError),
#[error("An internal error has occurred. `{0}`.")]
Internal(Box<dyn Error + Send + Sync + 'static>),
// #[error("{0}")]
// DumpError(#[from] DumpError),
#[error(transparent)]
DocumentFormatError(#[from] DocumentFormatError),
#[error("A {0} payload is missing.")]
MissingPayload(DocumentAdditionFormat),
#[error("The provided payload reached the size limit.")]
PayloadTooLarge,
}
internal_error!(IndexControllerError: JoinError, file_store::Error);
impl From<actix_web::error::PayloadError> for IndexControllerError {
fn from(other: actix_web::error::PayloadError) -> Self {
match other {
actix_web::error::PayloadError::Overflow => Self::PayloadTooLarge,
_ => Self::Internal(Box::new(other)),
}
}
}
impl ErrorCode for IndexControllerError {
fn error_code(&self) -> Code {
match self {
IndexControllerError::MissingUid => Code::BadRequest,
IndexControllerError::Internal(_) => Code::Internal,
IndexControllerError::DocumentFormatError(e) => e.error_code(),
IndexControllerError::MissingPayload(_) => Code::MissingPayload,
IndexControllerError::PayloadTooLarge => Code::PayloadTooLarge,
IndexControllerError::IndexResolver(e) => e.error_code(),
IndexControllerError::IndexError(e) => e.error_code(),
}
}
}
/*
impl From<IndexUidFormatError> for IndexControllerError {
fn from(err: IndexUidFormatError) -> Self {
index_scheduler::Error::from(err).into()
}
}
*/

View file

@ -1,565 +0,0 @@
use std::collections::BTreeMap;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use actix_web::error::PayloadError;
use bytes::Bytes;
use futures::Stream;
use index_scheduler::task::{Status, Task};
use index_scheduler::{IndexScheduler, KindWithContent, TaskId, TaskView};
use meilisearch_auth::SearchRules;
use milli::update::{IndexDocumentsMethod, IndexerConfig};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use uuid::Uuid;
// use crate::dump::{self, load_dump, DumpHandler};
use crate::options::{IndexerOpts, SchedulerConfig};
// use crate::snapshot::{load_snapshot, SnapshotService};
use error::Result;
use index::{
Checked, Document, Index, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
};
pub mod error;
pub mod versioning;
pub type Payload = Box<
dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin,
>;
pub fn open_meta_env(path: &Path, size: usize) -> milli::heed::Result<milli::heed::Env> {
let mut options = milli::heed::EnvOpenOptions::new();
options.map_size(size);
options.max_dbs(20);
options.open(path)
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMetadata {
#[serde(skip)]
pub uuid: Uuid,
pub uid: String,
#[serde(flatten)]
pub meta: IndexMeta,
}
#[derive(Clone, Debug)]
pub struct IndexSettings {
pub uid: Option<String>,
pub primary_key: Option<String>,
}
#[derive(Clone)]
pub struct Meilisearch {
index_scheduler: IndexScheduler,
}
impl std::ops::Deref for Meilisearch {
type Target = IndexScheduler;
fn deref(&self) -> &Self::Target {
&self.index_scheduler
}
}
#[derive(Debug)]
pub enum DocumentAdditionFormat {
Json,
Csv,
Ndjson,
}
impl fmt::Display for DocumentAdditionFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DocumentAdditionFormat::Json => write!(f, "json"),
DocumentAdditionFormat::Ndjson => write!(f, "ndjson"),
DocumentAdditionFormat::Csv => write!(f, "csv"),
}
}
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Stats {
pub database_size: u64,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub last_update: Option<OffsetDateTime>,
pub indexes: BTreeMap<String, IndexStats>,
}
#[allow(clippy::large_enum_variant)]
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub enum Update {
DeleteDocuments(Vec<String>),
ClearDocuments,
Settings {
settings: Settings<Unchecked>,
/// Indicates whether the update was a deletion
is_deletion: bool,
allow_index_creation: bool,
},
DocumentAddition {
#[derivative(Debug = "ignore")]
payload: Payload,
primary_key: Option<String>,
method: IndexDocumentsMethod,
format: DocumentAdditionFormat,
allow_index_creation: bool,
},
DeleteIndex,
CreateIndex {
primary_key: Option<String>,
},
UpdateIndex {
primary_key: Option<String>,
},
}
#[derive(Default, Debug)]
pub struct IndexControllerBuilder {
max_index_size: Option<usize>,
max_task_store_size: Option<usize>,
snapshot_dir: Option<PathBuf>,
import_snapshot: Option<PathBuf>,
snapshot_interval: Option<Duration>,
ignore_snapshot_if_db_exists: bool,
ignore_missing_snapshot: bool,
schedule_snapshot: bool,
dump_src: Option<PathBuf>,
dump_dst: Option<PathBuf>,
ignore_dump_if_db_exists: bool,
ignore_missing_dump: bool,
}
impl IndexControllerBuilder {
pub fn build(
self,
db_path: impl AsRef<Path>,
indexer_options: IndexerOpts,
scheduler_config: SchedulerConfig,
) -> anyhow::Result<Meilisearch> {
let index_size = self
.max_index_size
.ok_or_else(|| anyhow::anyhow!("Missing index size"))?;
let task_store_size = self
.max_task_store_size
.ok_or_else(|| anyhow::anyhow!("Missing update database size"))?;
/*
TODO: TAMO: enable dumps and snapshots to happens
if let Some(ref path) = self.import_snapshot {
log::info!("Loading from snapshot {:?}", path);
load_snapshot(
db_path.as_ref(),
path,
self.ignore_snapshot_if_db_exists,
self.ignore_missing_snapshot,
)?;
} else if let Some(ref src_path) = self.dump_src {
load_dump(
db_path.as_ref(),
src_path,
self.ignore_dump_if_db_exists,
self.ignore_missing_dump,
index_size,
task_store_size,
&indexer_options,
)?;
} else if db_path.as_ref().exists() {
// Directory could be pre-created without any database in.
let db_is_empty = db_path.as_ref().read_dir()?.next().is_none();
if !db_is_empty {
versioning::check_version_file(db_path.as_ref())?;
}
}
*/
std::fs::create_dir_all(db_path.as_ref())?;
let meta_env = Arc::new(open_meta_env(db_path.as_ref(), task_store_size)?);
// Create or overwrite the version file for this DB
versioning::create_version_file(db_path.as_ref())?;
let indexer_config = IndexerConfig {
log_every_n: Some(indexer_options.log_every_n),
max_nb_chunks: indexer_options.max_nb_chunks,
documents_chunk_size: None,
// TODO: TAMO: Fix this thing
max_memory: None, // Some(indexer_options.max_indexing_memory.into()),
chunk_compression_type: milli::CompressionType::None,
chunk_compression_level: None,
// TODO: TAMO: do something with the indexing_config.max_indexing_threads
thread_pool: None,
max_positions_per_attributes: None,
};
let index_scheduler = IndexScheduler::new(
db_path.as_ref().join("tasks"),
db_path.as_ref().join("update_files"),
db_path.as_ref().join("indexes"),
index_size,
indexer_config,
)?;
/*
if self.schedule_snapshot {
let snapshot_period = self
.snapshot_interval
.ok_or_else(|| anyhow::anyhow!("Snapshot interval not provided."))?;
let snapshot_path = self
.snapshot_dir
.ok_or_else(|| anyhow::anyhow!("Snapshot path not provided."))?;
let snapshot_service = SnapshotService {
db_path: db_path.as_ref().to_path_buf(),
snapshot_period,
snapshot_path,
index_size,
meta_env_size: task_store_size,
scheduler: scheduler.clone(),
};
tokio::task::spawn_local(snapshot_service.run());
}
*/
Ok(Meilisearch { index_scheduler })
}
/// Set the index controller builder's max update store size.
pub fn set_max_task_store_size(&mut self, max_update_store_size: usize) -> &mut Self {
let max_update_store_size = clamp_to_page_size(max_update_store_size);
self.max_task_store_size.replace(max_update_store_size);
self
}
pub fn set_max_index_size(&mut self, size: usize) -> &mut Self {
let size = clamp_to_page_size(size);
self.max_index_size.replace(size);
self
}
/// Set the index controller builder's snapshot path.
pub fn set_snapshot_dir(&mut self, snapshot_dir: PathBuf) -> &mut Self {
self.snapshot_dir.replace(snapshot_dir);
self
}
/// Set the index controller builder's ignore snapshot if db exists.
pub fn set_ignore_snapshot_if_db_exists(
&mut self,
ignore_snapshot_if_db_exists: bool,
) -> &mut Self {
self.ignore_snapshot_if_db_exists = ignore_snapshot_if_db_exists;
self
}
/// Set the index controller builder's ignore missing snapshot.
pub fn set_ignore_missing_snapshot(&mut self, ignore_missing_snapshot: bool) -> &mut Self {
self.ignore_missing_snapshot = ignore_missing_snapshot;
self
}
/// Set the index controller builder's import snapshot.
pub fn set_import_snapshot(&mut self, import_snapshot: PathBuf) -> &mut Self {
self.import_snapshot.replace(import_snapshot);
self
}
/// Set the index controller builder's snapshot interval sec.
pub fn set_snapshot_interval(&mut self, snapshot_interval: Duration) -> &mut Self {
self.snapshot_interval = Some(snapshot_interval);
self
}
/// Set the index controller builder's schedule snapshot.
pub fn set_schedule_snapshot(&mut self) -> &mut Self {
self.schedule_snapshot = true;
self
}
/// Set the index controller builder's dump src.
pub fn set_dump_src(&mut self, dump_src: PathBuf) -> &mut Self {
self.dump_src.replace(dump_src);
self
}
/// Set the index controller builder's dump dst.
pub fn set_dump_dst(&mut self, dump_dst: PathBuf) -> &mut Self {
self.dump_dst.replace(dump_dst);
self
}
/// Set the index controller builder's ignore dump if db exists.
pub fn set_ignore_dump_if_db_exists(&mut self, ignore_dump_if_db_exists: bool) -> &mut Self {
self.ignore_dump_if_db_exists = ignore_dump_if_db_exists;
self
}
/// Set the index controller builder's ignore missing dump.
pub fn set_ignore_missing_dump(&mut self, ignore_missing_dump: bool) -> &mut Self {
self.ignore_missing_dump = ignore_missing_dump;
self
}
}
impl Meilisearch {
pub fn builder() -> IndexControllerBuilder {
IndexControllerBuilder::default()
}
pub async fn register_task(&self, task: KindWithContent) -> Result<TaskView> {
let this = self.clone();
Ok(
tokio::task::spawn_blocking(move || this.clone().index_scheduler.register(task))
.await??,
)
}
pub async fn list_tasks(&self, filter: index_scheduler::Query) -> Result<Vec<TaskView>> {
Ok(self.index_scheduler.get_tasks(filter)?)
}
pub async fn list_indexes(&self) -> Result<Vec<Index>> {
let this = self.clone();
Ok(spawn_blocking(move || this.index_scheduler.indexes()).await??)
}
/// Return the total number of documents contained in the index + the selected documents.
pub async fn documents(
&self,
uid: String,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<(u64, Vec<Document>)> {
let this = self.clone();
spawn_blocking(move || -> Result<_> {
let index = this.index_scheduler.index(&uid)?;
Ok(index.retrieve_documents(offset, limit, attributes_to_retrieve)?)
})
.await?
}
pub async fn document(
&self,
uid: String,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let this = self.clone();
spawn_blocking(move || -> Result<_> {
let index = this.index_scheduler.index(&uid)?;
Ok(index.retrieve_document(doc_id, attributes_to_retrieve)?)
})
.await?
}
pub async fn search(&self, uid: String, query: SearchQuery) -> Result<SearchResult> {
let this = self.clone();
spawn_blocking(move || -> Result<_> {
let index = this.index_scheduler.index(&uid)?;
Ok(index.perform_search(query)?)
})
.await?
}
pub async fn get_index(&self, uid: String) -> Result<Index> {
let this = self.clone();
Ok(spawn_blocking(move || this.index_scheduler.index(&uid)).await??)
}
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
let processing_tasks = self
.index_scheduler
.get_tasks(index_scheduler::Query::default().with_status(Status::Processing))?;
// Check if the currently indexing update is from our index.
let is_indexing = processing_tasks.first().map_or(false, |task| {
task.index_uid.as_ref().map_or(false, |u| u == &uid)
});
let index = self.get_index(uid).await?;
let mut stats = spawn_blocking(move || index.stats()).await??;
stats.is_indexing = Some(is_indexing);
Ok(stats)
}
pub async fn get_all_stats(&self, search_rules: &SearchRules) -> Result<Stats> {
let mut last_task: Option<OffsetDateTime> = None;
let mut indexes = BTreeMap::new();
let mut database_size = 0;
let processing_tasks = self
.index_scheduler
.get_tasks(index_scheduler::Query::default().with_status(Status::Processing))?;
for index in self.list_indexes().await? {
if !search_rules.is_index_authorized(&index.name) {
continue;
}
let index_name = index.name.clone();
let (mut stats, meta) =
spawn_blocking::<_, Result<(IndexStats, IndexMeta)>>(move || {
Ok((index.stats()?, index.meta()?))
})
.await??;
database_size += stats.size;
last_task = last_task.map_or(Some(meta.updated_at), |last| {
Some(last.max(meta.updated_at))
});
// Check if the currently indexing update is from our index.
stats.is_indexing = processing_tasks
.first()
.and_then(|p| p.index_uid.as_ref().map(|u| u == &index_name))
.or(Some(false));
indexes.insert(index_name, stats);
}
Ok(Stats {
database_size,
last_update: last_task,
indexes,
})
}
}
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
loop {
match Arc::try_unwrap(item) {
Ok(item) => return item,
Err(item_arc) => {
item = item_arc;
sleep(Duration::from_millis(100)).await;
continue;
}
}
}
}
// Clamp the provided value to be a multiple of system page size.
fn clamp_to_page_size(size: usize) -> usize {
size / page_size::get() * page_size::get()
}
/*
TODO: TAMO: uncomment this test
#[cfg(test)]
mod test {
use futures::future::ok;
use mockall::predicate::eq;
use nelson::Mocker;
use index::error::Result as IndexResult;
use index::Index;
use index::{DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG};
use super::*;
#[actix_rt::test]
async fn test_search_simple() {
let index_uid = "test";
let index_uuid = Uuid::new_v4();
let query = SearchQuery {
q: Some(String::from("hello world")),
offset: Some(10),
limit: 0,
attributes_to_retrieve: Some(vec!["string".to_owned()].into_iter().collect()),
attributes_to_crop: None,
crop_length: 18,
attributes_to_highlight: None,
show_matches_position: true,
filter: None,
sort: None,
facets: None,
highlight_pre_tag: DEFAULT_HIGHLIGHT_PRE_TAG(),
highlight_post_tag: DEFAULT_HIGHLIGHT_POST_TAG(),
crop_marker: DEFAULT_CROP_MARKER(),
matching_strategy: Default::default(),
};
let result = SearchResult {
hits: vec![],
estimated_total_hits: 29,
query: "hello world".to_string(),
limit: 24,
offset: 0,
processing_time_ms: 50,
facet_distribution: None,
};
let mut uuid_store = MockIndexMetaStore::new();
uuid_store
.expect_get()
.with(eq(index_uid.to_owned()))
.returning(move |s| {
Box::pin(ok((
s,
Some(crate::index_resolver::meta_store::IndexMeta {
uuid: index_uuid,
creation_task_id: 0,
}),
)))
});
let mut index_store = MockIndexStore::new();
let result_clone = result.clone();
let query_clone = query.clone();
index_store
.expect_get()
.with(eq(index_uuid))
.returning(move |_uuid| {
let result = result_clone.clone();
let query = query_clone.clone();
let mocker = Mocker::default();
mocker
.when::<SearchQuery, IndexResult<SearchResult>>("perform_search")
.once()
.then(move |q| {
assert_eq!(&q, &query);
Ok(result.clone())
});
let index = Index::mock(mocker);
Box::pin(ok(Some(index)))
});
let task_store_mocker = nelson::Mocker::default();
let mocker = Mocker::default();
let update_file_store = UpdateFileStore::mock(mocker);
let index_resolver = Arc::new(IndexResolver::new(
uuid_store,
index_store,
update_file_store.clone(),
));
let task_store = TaskStore::mock(task_store_mocker);
let scheduler = Scheduler::new(
task_store.clone(),
vec![index_resolver.clone()],
SchedulerConfig::default(),
)
.unwrap();
let index_controller =
IndexController::mock(index_resolver, task_store, update_file_store, scheduler);
let r = index_controller
.search(index_uid.to_owned(), query.clone())
.await
.unwrap();
assert_eq!(r, result);
}
}
*/

View file

@ -1,79 +0,0 @@
use std::error::Error;
use std::fmt;
use meilisearch_types::{internal_error, Code, ErrorCode};
use crate::{
document_formats::DocumentFormatError,
index::error::IndexError,
index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat},
};
pub type Result<T> = std::result::Result<T, UpdateLoopError>;
#[derive(Debug, thiserror::Error)]
#[allow(clippy::large_enum_variant)]
pub enum UpdateLoopError {
#[error("Task `{0}` not found.")]
UnexistingUpdate(u64),
#[error("An internal error has occurred. `{0}`.")]
Internal(Box<dyn Error + Send + Sync + 'static>),
#[error(
"update store was shut down due to a fatal error, please check your logs for more info."
)]
FatalUpdateStoreError,
#[error("{0}")]
DocumentFormatError(#[from] DocumentFormatError),
#[error("The provided payload reached the size limit.")]
PayloadTooLarge,
#[error("A {0} payload is missing.")]
MissingPayload(DocumentAdditionFormat),
#[error("{0}")]
IndexError(#[from] IndexError),
}
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError
where
T: Sync + Send + 'static + fmt::Debug,
{
fn from(other: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::Internal(Box::new(other))
}
}
impl From<tokio::sync::oneshot::error::RecvError> for UpdateLoopError {
fn from(other: tokio::sync::oneshot::error::RecvError) -> Self {
Self::Internal(Box::new(other))
}
}
impl From<actix_web::error::PayloadError> for UpdateLoopError {
fn from(other: actix_web::error::PayloadError) -> Self {
match other {
actix_web::error::PayloadError::Overflow => Self::PayloadTooLarge,
_ => Self::Internal(Box::new(other)),
}
}
}
internal_error!(
UpdateLoopError: heed::Error,
std::io::Error,
serde_json::Error,
tokio::task::JoinError,
UpdateFileStoreError
);
impl ErrorCode for UpdateLoopError {
fn error_code(&self) -> Code {
match self {
Self::UnexistingUpdate(_) => Code::TaskNotFound,
Self::Internal(_) => Code::Internal,
Self::FatalUpdateStoreError => Code::Internal,
Self::DocumentFormatError(error) => error.error_code(),
Self::PayloadTooLarge => Code::PayloadTooLarge,
Self::MissingPayload(_) => Code::MissingPayload,
Self::IndexError(e) => e.error_code(),
}
}
}

View file

@ -1,19 +0,0 @@
#[derive(thiserror::Error, Debug)]
pub enum VersionFileError {
#[error(
"Meilisearch (v{}) failed to infer the version of the database. Please consider using a dump to load your data.",
env!("CARGO_PKG_VERSION").to_string()
)]
MissingVersionFile,
#[error("Version file is corrupted and thus Meilisearch is unable to determine the version of the database.")]
MalformedVersionFile,
#[error(
"Expected Meilisearch engine version: {major}.{minor}.{patch}, current engine version: {}. To update Meilisearch use a dump.",
env!("CARGO_PKG_VERSION").to_string()
)]
VersionMismatch {
major: String,
minor: String,
patch: String,
},
}

View file

@ -1,56 +0,0 @@
use std::fs;
use std::io::ErrorKind;
use std::path::Path;
use self::error::VersionFileError;
mod error;
pub const VERSION_FILE_NAME: &str = "VERSION";
static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
// Persists the version of the current Meilisearch binary to a VERSION file
pub fn create_version_file(db_path: &Path) -> anyhow::Result<()> {
let version_path = db_path.join(VERSION_FILE_NAME);
fs::write(
version_path,
format!("{}.{}.{}", VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH),
)?;
Ok(())
}
// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch.
pub fn check_version_file(db_path: &Path) -> anyhow::Result<()> {
let version_path = db_path.join(VERSION_FILE_NAME);
match fs::read_to_string(&version_path) {
Ok(version) => {
let version_components = version.split('.').collect::<Vec<_>>();
let (major, minor, patch) = match &version_components[..] {
[major, minor, patch] => (major.to_string(), minor.to_string(), patch.to_string()),
_ => return Err(VersionFileError::MalformedVersionFile.into()),
};
if major != VERSION_MAJOR || minor != VERSION_MINOR {
return Err(VersionFileError::VersionMismatch {
major,
minor,
patch,
}
.into());
}
}
Err(error) => {
return match error.kind() {
ErrorKind::NotFound => Err(VersionFileError::MissingVersionFile.into()),
_ => Err(error.into()),
}
}
}
Ok(())
}