2065: MeiliSearch v0.25.0: `stable` -> `main` r=curquiza a=curquiza



Co-authored-by: Clémentine Urquizar <clementine@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com>
Co-authored-by: many <maxime@meilisearch.com>
Co-authored-by: Marin Postma <postma.marin@protonmail.com>
Co-authored-by: Maxime Legendre <maximelegendre@MacBook-Pro-de-Maxime.local>
Co-authored-by: Maxime Legendre <maximelegendre@mbp-de-maxime.home>
Co-authored-by: Tamo <tamo@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
This commit is contained in:
bors[bot] 2022-01-11 16:30:22 +00:00 committed by GitHub
commit 5d48f72ade
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
48 changed files with 879 additions and 320 deletions

View file

@ -8,7 +8,7 @@ use indexmap::IndexMap;
use milli::documents::DocumentBatchReader;
use serde::{Deserialize, Serialize};
use crate::document_formats::read_ndjson;
use crate::document_formats::{read_ndjson, DocumentFormatError};
use crate::index::update_handler::UpdateHandler;
use crate::index::updates::apply_settings_to_builder;
@ -128,23 +128,29 @@ impl Index {
let mut tmp_doc_file = tempfile::tempfile()?;
read_ndjson(reader, &mut tmp_doc_file)?;
let empty = match read_ndjson(reader, &mut tmp_doc_file) {
// if there was no document in the file it's because the index was empty
Ok(_) => false,
Err(DocumentFormatError::EmptyPayload(_)) => true,
Err(e) => return Err(e.into()),
};
tmp_doc_file.seek(SeekFrom::Start(0))?;
if !empty {
tmp_doc_file.seek(SeekFrom::Start(0))?;
let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?;
let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?;
//If the document file is empty, we don't perform the document addition, to prevent
//a primary key error to be thrown.
if !documents_reader.is_empty() {
let builder = update_handler
.update_builder()
.index_documents(&mut txn, &index);
builder.execute(documents_reader, |_| ())?;
//If the document file is empty, we don't perform the document addition, to prevent
//a primary key error to be thrown.
if !documents_reader.is_empty() {
let builder = update_handler
.update_builder()
.index_documents(&mut txn, &index);
builder.execute(documents_reader, |_| ())?;
}
}
txn.commit()?;
index.prepare_for_closing().wait();
Ok(())

View file

@ -237,7 +237,9 @@ impl Index {
let mut txn = self.write_txn()?;
if let Some(primary_key) = primary_key {
self.update_primary_key_txn(&mut txn, primary_key)?;
if self.primary_key(&txn)?.is_none() {
self.update_primary_key_txn(&mut txn, primary_key)?;
}
}
let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step);

View file

@ -74,11 +74,13 @@ impl From<Update> for TaskContent {
primary_key,
// document count is unknown for legacy updates
documents_count: 0,
allow_index_creation: true,
},
Update::Settings(settings) => TaskContent::SettingsUpdate {
settings,
// There is no way to know now, so we assume it isn't
is_deletion: false,
allow_index_creation: true,
},
Update::ClearDocuments => TaskContent::DocumentDeletion(DocumentDeletion::Clear),
}

View file

@ -1,3 +1,4 @@
use meilisearch_auth::error::AuthControllerError;
use meilisearch_error::{internal_error, Code, ErrorCode};
use crate::{index_resolver::error::IndexResolverError, tasks::error::TaskError};
@ -24,6 +25,7 @@ internal_error!(
serde_json::error::Error,
tempfile::PersistError,
fs_extra::error::Error,
AuthControllerError,
TaskError
);

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use heed::EnvOpenOptions;
use log::info;
use meilisearch_auth::AuthController;
use crate::analytics;
use crate::index_controller::dump_actor::Metadata;
@ -38,6 +39,7 @@ pub fn load_dump(
)?;
UpdateFileStore::load_dump(src.as_ref(), &dst)?;
TaskStore::load_dump(&src, env)?;
AuthController::load_dump(&src, &dst)?;
analytics::copy_user_id(src.as_ref(), dst.as_ref());
info!("Loading indexes.");

View file

@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
pub use actor::DumpActor;
pub use handle_impl::*;
use meilisearch_auth::AuthController;
pub use message::DumpMsg;
use tokio::fs::create_dir_all;
use tokio::sync::oneshot;
@ -297,6 +298,8 @@ impl DumpJob {
.dump(&temp_dump_path, self.update_file_store.clone())
.await?;
AuthController::dump(&self.db_path, &temp_dump_path)?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
// for now we simply copy the updates/updates_files
// FIXME: We may copy more files than necessary, if new files are added while we are

View file

@ -59,10 +59,7 @@ impl ErrorCode for IndexControllerError {
IndexControllerError::DocumentFormatError(e) => e.error_code(),
IndexControllerError::MissingPayload(_) => Code::MissingPayload,
IndexControllerError::PayloadTooLarge => Code::PayloadTooLarge,
IndexControllerError::DumpError(DumpActorError::DumpAlreadyRunning) => {
Code::DumpAlreadyInProgress
}
IndexControllerError::DumpError(_) => Code::DumpProcessFailed,
IndexControllerError::DumpError(e) => e.error_code(),
}
}
}

View file

@ -119,6 +119,7 @@ pub enum Update {
settings: Settings<Unchecked>,
/// Indicates whether the update was a deletion
is_deletion: bool,
allow_index_creation: bool,
},
DocumentAddition {
#[derivative(Debug = "ignore")]
@ -126,6 +127,7 @@ pub enum Update {
primary_key: Option<String>,
method: IndexDocumentsMethod,
format: DocumentAdditionFormat,
allow_index_creation: bool,
},
DeleteIndex,
CreateIndex {
@ -165,7 +167,11 @@ impl IndexControllerBuilder {
let db_exists = db_path.as_ref().exists();
if db_exists {
versioning::check_version_file(db_path.as_ref())?;
// Directory could be pre-created without any database in.
let db_is_empty = db_path.as_ref().read_dir()?.next().is_none();
if !db_is_empty {
versioning::check_version_file(db_path.as_ref())?;
}
}
if let Some(ref path) = self.import_snapshot {
@ -340,15 +346,18 @@ where
Update::Settings {
settings,
is_deletion,
allow_index_creation,
} => TaskContent::SettingsUpdate {
settings,
is_deletion,
allow_index_creation,
},
Update::DocumentAddition {
mut payload,
primary_key,
format,
method,
allow_index_creation,
} => {
let mut buffer = Vec::new();
while let Some(bytes) = payload.next().await {
@ -380,6 +389,7 @@ where
merge_strategy: method,
primary_key,
documents_count,
allow_index_creation,
}
}
Update::DeleteIndex => TaskContent::IndexDeletion,

View file

@ -1,11 +1,11 @@
#[derive(thiserror::Error, Debug)]
pub enum VersionFileError {
#[error("Version file is missing or the previous MeiliSearch engine version was below 0.24.0. Use a dump to update Meilisearch.")]
#[error("Version file is missing or the previous MeiliSearch engine version was below 0.24.0. Use a dump to update MeiliSearch.")]
MissingVersionFile,
#[error("Version file is corrupted and thus MeiliSearch is unable to determine the version of the database.")]
MalformedVersionFile,
#[error(
"Expected MeiliSearch engine version: {major}.{minor}.{patch}, current engine version: {}. To update Meilisearch use a dump.",
"Expected MeiliSearch engine version: {major}.{minor}.{patch}, current engine version: {}. To update MeiliSearch use a dump.",
env!("CARGO_PKG_VERSION").to_string()
)]
VersionMismatch {

View file

@ -23,7 +23,7 @@ pub fn create_version_file(db_path: &Path) -> anyhow::Result<()> {
Ok(())
}
// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch.
// Ensures MeiliSearch version is compatible with the database, returns an error versions mismatch.
pub fn check_version_file(db_path: &Path) -> anyhow::Result<()> {
let version_path = db_path.join(VERSION_FILE_NAME);

View file

@ -187,13 +187,18 @@ where
content_uuid,
merge_strategy,
primary_key,
allow_index_creation,
..
} => {
let primary_key = primary_key.clone();
let content_uuid = *content_uuid;
let method = *merge_strategy;
let index = self.get_or_create_index(index_uid, task.id).await?;
let index = if *allow_index_creation {
self.get_or_create_index(index_uid, task.id).await?
} else {
self.get_index(index_uid.into_inner()).await?
};
let file_store = self.file_store.clone();
let result = spawn_blocking(move || {
index.update_documents(method, content_uuid, primary_key, file_store)
@ -226,8 +231,9 @@ where
TaskContent::SettingsUpdate {
settings,
is_deletion,
allow_index_creation,
} => {
let index = if *is_deletion {
let index = if *is_deletion || !*allow_index_creation {
self.get_index(index_uid.into_inner()).await?
} else {
self.get_or_create_index(index_uid, task.id).await?
@ -502,8 +508,8 @@ mod test {
match &task.content {
// an unexisting index should trigger an index creation in the folllowing cases:
TaskContent::DocumentAddition { .. }
| TaskContent::SettingsUpdate { is_deletion: false, .. }
TaskContent::DocumentAddition { allow_index_creation: true, .. }
| TaskContent::SettingsUpdate { allow_index_creation: true, is_deletion: false, .. }
| TaskContent::IndexCreation { .. } if !index_exists => {
index_store
.expect_create()
@ -565,6 +571,8 @@ mod test {
|| (!index_exists && matches!(task.content, TaskContent::IndexDeletion
| TaskContent::DocumentDeletion(_)
| TaskContent::SettingsUpdate { is_deletion: true, ..}
| TaskContent::SettingsUpdate { allow_index_creation: false, ..}
| TaskContent::DocumentAddition { allow_index_creation: false, ..}
| TaskContent::IndexUpdate { .. } ))
{
assert!(result.is_err(), "{:?}", result);

View file

@ -107,6 +107,7 @@ impl SnapshotJob {
self.snapshot_meta_env(temp_snapshot_path)?;
self.snapshot_file_store(temp_snapshot_path)?;
self.snapshot_indexes(temp_snapshot_path)?;
self.snapshot_auth(temp_snapshot_path)?;
let db_name = self
.src_path
@ -190,4 +191,18 @@ impl SnapshotJob {
Ok(())
}
fn snapshot_auth(&self, path: &Path) -> anyhow::Result<()> {
let auth_path = self.src_path.join("auth");
let dst = path.join("auth");
std::fs::create_dir_all(&dst)?;
let dst = dst.join("data.mdb");
let mut options = heed::EnvOpenOptions::new();
options.map_size(1_073_741_824);
let env = options.open(auth_path)?;
env.copy_to_path(dst, heed::CompactionOption::Enabled)?;
Ok(())
}
}

View file

@ -55,7 +55,7 @@ pub enum TaskEvent {
},
}
/// A task represents an operation that Meilisearch must do.
/// A task represents an operation that MeiliSearch must do.
/// It's stored on disk and executed from the lowest to highest Task id.
/// Everytime a new task is created it has a higher Task id than the previous one.
/// See also `Job`.
@ -91,7 +91,7 @@ impl Task {
/// A job is like a volatile priority `Task`.
/// It should be processed as fast as possible and is not stored on disk.
/// This means, when Meilisearch is closed all your unprocessed jobs will disappear.
/// This means, when MeiliSearch is closed all your unprocessed jobs will disappear.
#[derive(Debug, derivative::Derivative)]
#[derivative(PartialEq)]
pub enum Job {
@ -134,12 +134,14 @@ pub enum TaskContent {
merge_strategy: IndexDocumentsMethod,
primary_key: Option<String>,
documents_count: usize,
allow_index_creation: bool,
},
DocumentDeletion(DocumentDeletion),
SettingsUpdate {
settings: Settings<Unchecked>,
/// Indicates whether the task was a deletion
is_deletion: bool,
allow_index_creation: bool,
},
IndexDeletion,
IndexCreation {

View file

@ -87,7 +87,7 @@ impl Store {
/// This function should be called *right after* creating the store.
/// It put back all unfinished update in the `Created` state. This
/// allow us to re-enqueue an update that didn't had the time to finish
/// when Meilisearch closed.
/// when MeiliSearch closed.
pub fn reset_and_return_unfinished_tasks(&mut self) -> Result<BinaryHeap<Pending<TaskId>>> {
let mut unfinished_tasks: BinaryHeap<Pending<TaskId>> = BinaryHeap::new();