Merge branch 'main' into stable

This commit is contained in:
Clémentine Urquizar - curqui 2022-01-26 20:17:41 +01:00 committed by GitHub
commit aa50fcb1f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
47 changed files with 1065 additions and 922 deletions

View file

@ -1,14 +1,12 @@
[package]
name = "meilisearch-lib"
version = "0.25.2"
edition = "2018"
resolver = "2"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web = { version = "4.0.0-beta.9", features = ["rustls"] }
actix-web-static-files = { git = "https://github.com/MarinPostma/actix-web-static-files.git", rev = "39d8006", optional = true }
actix-web = { version = "4.0.0-beta.21", default-features = false }
anyhow = { version = "1.0.43", features = ["backtrace"] }
async-stream = "0.3.2"
async-trait = "0.1.51"
@ -43,7 +41,7 @@ serde = { version = "1.0.130", features = ["derive"] }
serde_json = { version = "1.0.67", features = ["preserve_order"] }
siphasher = "0.3.7"
slice-group-by = "0.2.6"
structopt = "0.3.23"
clap = { version = "3.0", features = ["derive", "env"] }
tar = "0.4.37"
tempfile = "3.2.0"
thiserror = "1.0.28"

View file

@ -32,8 +32,6 @@ pub enum DocumentFormatError {
Box<dyn std::error::Error + Send + Sync + 'static>,
PayloadType,
),
#[error("The `{0}` payload must contain at least one document.")]
EmptyPayload(PayloadType),
}
impl From<(PayloadType, milli::documents::Error)> for DocumentFormatError {
@ -50,7 +48,6 @@ impl ErrorCode for DocumentFormatError {
match self {
DocumentFormatError::Internal(_) => Code::Internal,
DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload,
DocumentFormatError::EmptyPayload(_) => Code::MalformedPayload,
}
}
}
@ -63,10 +60,6 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let builder =
DocumentBatchBuilder::from_csv(input, writer).map_err(|e| (PayloadType::Csv, e))?;
if builder.len() == 0 {
return Err(DocumentFormatError::EmptyPayload(PayloadType::Csv));
}
let count = builder.finish().map_err(|e| (PayloadType::Csv, e))?;
Ok(count)
@ -81,16 +74,17 @@ pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize>
let mut buf = String::new();
while reader.read_line(&mut buf)? > 0 {
// skip empty lines
if buf == "\n" {
buf.clear();
continue;
}
builder
.extend_from_json(Cursor::new(&buf.as_bytes()))
.map_err(|e| (PayloadType::Ndjson, e))?;
buf.clear();
}
if builder.len() == 0 {
return Err(DocumentFormatError::EmptyPayload(PayloadType::Ndjson));
}
let count = builder.finish().map_err(|e| (PayloadType::Ndjson, e))?;
Ok(count)
@ -104,10 +98,6 @@ pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
.extend_from_json(input)
.map_err(|e| (PayloadType::Json, e))?;
if builder.len() == 0 {
return Err(DocumentFormatError::EmptyPayload(PayloadType::Json));
}
let count = builder.finish().map_err(|e| (PayloadType::Json, e))?;
Ok(count)

View file

@ -8,7 +8,7 @@ use indexmap::IndexMap;
use milli::documents::DocumentBatchReader;
use serde::{Deserialize, Serialize};
use crate::document_formats::{read_ndjson, DocumentFormatError};
use crate::document_formats::read_ndjson;
use crate::index::update_handler::UpdateHandler;
use crate::index::updates::apply_settings_to_builder;
@ -128,8 +128,8 @@ impl Index {
let empty = match read_ndjson(reader, &mut tmp_doc_file) {
// if there was no document in the file it's because the index was empty
Ok(0) => true,
Ok(_) => false,
Err(DocumentFormatError::EmptyPayload(_)) => true,
Err(e) => return Err(e.into()),
};

View file

@ -877,7 +877,7 @@ mod test {
assert_eq!(value["publication_year"], "<em>1937</em>");
}
/// https://github.com/meilisearch/MeiliSearch/issues/1368
/// https://github.com/meilisearch/meilisearch/issues/1368
#[test]
fn formatted_with_highlight_emoji() {
let stop_words = fst::Set::default();

View file

@ -1,4 +1,5 @@
use std::path::Path;
use std::sync::Arc;
use heed::EnvOpenOptions;
use log::info;
@ -27,7 +28,7 @@ pub fn load_dump(
let mut options = EnvOpenOptions::new();
options.map_size(meta_env_size);
options.max_dbs(100);
let env = options.open(&dst)?;
let env = Arc::new(options.open(&dst)?);
IndexResolver::load_dump(
src.as_ref(),

View file

@ -1,14 +1,16 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use anyhow::bail;
use chrono::{DateTime, Utc};
use log::{info, trace, warn};
use log::{info, trace};
use serde::{Deserialize, Serialize};
pub use actor::DumpActor;
pub use handle_impl::*;
use meilisearch_auth::AuthController;
pub use message::DumpMsg;
use tempfile::TempDir;
use tokio::fs::create_dir_all;
use tokio::sync::oneshot;
@ -79,6 +81,47 @@ pub enum MetadataVersion {
}
impl MetadataVersion {
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
meta_env_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
match self {
MetadataVersion::V1(_meta) => {
anyhow::bail!("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.")
}
MetadataVersion::V2(meta) => v2::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
MetadataVersion::V3(meta) => v3::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
MetadataVersion::V4(meta) => v4::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
}
Ok(())
}
pub fn new_v4(index_db_size: usize, update_db_size: usize) -> Self {
let meta = Metadata::new(index_db_size, update_db_size);
Self::V4(meta)
@ -160,10 +203,46 @@ impl DumpInfo {
pub fn load_dump(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
ignore_dump_if_db_exists: bool,
ignore_missing_dump: bool,
index_db_size: usize,
update_db_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
let empty_db = crate::is_empty_db(&dst_path);
let src_path_exists = src_path.as_ref().exists();
if empty_db && src_path_exists {
let (tmp_src, tmp_dst, meta) = extract_dump(&dst_path, &src_path)?;
meta.load_dump(
tmp_src.path(),
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?;
persist_dump(&dst_path, tmp_dst)?;
Ok(())
} else if !empty_db && !ignore_dump_if_db_exists {
bail!(
"database already exists at {:?}, try to delete it or rename it",
dst_path
.as_ref()
.canonicalize()
.unwrap_or_else(|_| dst_path.as_ref().to_owned())
)
} else if !src_path_exists && !ignore_missing_dump {
bail!("dump doesn't exist at {:?}", src_path.as_ref())
} else {
// there is nothing to do
Ok(())
}
}
fn extract_dump(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
) -> anyhow::Result<(TempDir, TempDir, MetadataVersion)> {
// Setup a temp directory path in the same path as the database, to prevent cross devices
// references.
let temp_path = dst_path
@ -186,7 +265,11 @@ pub fn load_dump(
let mut meta_file = File::open(&meta_path)?;
let meta: MetadataVersion = serde_json::from_reader(&mut meta_file)?;
let tmp_dst = tempfile::tempdir()?;
if !dst_path.as_ref().exists() {
std::fs::create_dir_all(dst_path.as_ref())?;
}
let tmp_dst = tempfile::tempdir_in(dst_path.as_ref())?;
info!(
"Loading dump {}, dump database version: {}, dump version: {}",
@ -197,43 +280,37 @@ pub fn load_dump(
meta.version()
);
match meta {
MetadataVersion::V1(_meta) => {
anyhow::bail!("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.")
}
MetadataVersion::V2(meta) => v2::load_dump(
meta,
&tmp_src_path,
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?,
MetadataVersion::V3(meta) => v3::load_dump(
meta,
&tmp_src_path,
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?,
MetadataVersion::V4(meta) => v4::load_dump(
meta,
&tmp_src_path,
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?,
}
// Persist and atomically rename the db
Ok((tmp_src, tmp_dst, meta))
}
fn persist_dump(dst_path: impl AsRef<Path>, tmp_dst: TempDir) -> anyhow::Result<()> {
let persisted_dump = tmp_dst.into_path();
// Delete everything in the `data.ms` except the tempdir.
if dst_path.as_ref().exists() {
warn!("Overwriting database at {}", dst_path.as_ref().display());
std::fs::remove_dir_all(&dst_path)?;
for file in dst_path.as_ref().read_dir().unwrap() {
let file = file.unwrap().path();
if file.file_name() == persisted_dump.file_name() {
continue;
}
if file.is_file() {
std::fs::remove_file(&file)?;
} else {
std::fs::remove_dir_all(&file)?;
}
}
}
std::fs::rename(&persisted_dump, &dst_path)?;
// Move the whole content of the tempdir into the `data.ms`.
for file in persisted_dump.read_dir().unwrap() {
let file = file.unwrap().path();
std::fs::rename(&file, &dst_path.as_ref().join(file.file_name().unwrap()))?;
}
// Delete the empty tempdir.
std::fs::remove_dir_all(&persisted_dump)?;
Ok(())
}
@ -281,6 +358,7 @@ impl DumpJob {
AuthController::dump(&self.db_path, &temp_dump_path)?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let _ = &self;
// for now we simply copy the updates/updates_files
// FIXME: We may copy more files than necessary, if new files are added while we are
// performing the dump. We need a way to filter them out.

View file

@ -150,6 +150,8 @@ pub struct IndexControllerBuilder {
schedule_snapshot: bool,
dump_src: Option<PathBuf>,
dump_dst: Option<PathBuf>,
ignore_dump_if_db_exists: bool,
ignore_missing_dump: bool,
}
impl IndexControllerBuilder {
@ -186,6 +188,8 @@ impl IndexControllerBuilder {
load_dump(
db_path.as_ref(),
src_path,
self.ignore_dump_if_db_exists,
self.ignore_missing_dump,
index_size,
task_store_size,
&indexer_options,
@ -198,7 +202,7 @@ impl IndexControllerBuilder {
options.map_size(task_store_size);
options.max_dbs(20);
let meta_env = options.open(&db_path)?;
let meta_env = Arc::new(options.open(&db_path)?);
let update_file_store = UpdateFileStore::new(&db_path)?;
// Create or overwrite the version file for this DB
@ -296,18 +300,6 @@ impl IndexControllerBuilder {
self
}
/// Set the index controller builder's dump src.
pub fn set_dump_src(&mut self, dump_src: PathBuf) -> &mut Self {
self.dump_src.replace(dump_src);
self
}
/// Set the index controller builder's dump dst.
pub fn set_dump_dst(&mut self, dump_dst: PathBuf) -> &mut Self {
self.dump_dst.replace(dump_dst);
self
}
/// Set the index controller builder's import snapshot.
pub fn set_import_snapshot(&mut self, import_snapshot: PathBuf) -> &mut Self {
self.import_snapshot.replace(import_snapshot);
@ -325,6 +317,30 @@ impl IndexControllerBuilder {
self.schedule_snapshot = true;
self
}
/// Set the index controller builder's dump src.
pub fn set_dump_src(&mut self, dump_src: PathBuf) -> &mut Self {
self.dump_src.replace(dump_src);
self
}
/// Set the index controller builder's dump dst.
pub fn set_dump_dst(&mut self, dump_dst: PathBuf) -> &mut Self {
self.dump_dst.replace(dump_dst);
self
}
/// Set the index controller builder's ignore dump if db exists.
pub fn set_ignore_dump_if_db_exists(&mut self, ignore_dump_if_db_exists: bool) -> &mut Self {
self.ignore_dump_if_db_exists = ignore_dump_if_db_exists;
self
}
/// Set the index controller builder's ignore missing dump.
pub fn set_ignore_missing_dump(&mut self, ignore_missing_dump: bool) -> &mut Self {
self.ignore_missing_dump = ignore_missing_dump;
self
}
}
impl<U, I> IndexController<U, I>

View file

@ -1,11 +1,14 @@
#[derive(thiserror::Error, Debug)]
pub enum VersionFileError {
#[error("Version file is missing or the previous MeiliSearch engine version was below 0.24.0. Use a dump to update MeiliSearch.")]
#[error(
"Meilisearch (v{}) failed to infer the version of the database. Please consider using a dump to load your data.",
env!("CARGO_PKG_VERSION").to_string()
)]
MissingVersionFile,
#[error("Version file is corrupted and thus MeiliSearch is unable to determine the version of the database.")]
#[error("Version file is corrupted and thus Meilisearch is unable to determine the version of the database.")]
MalformedVersionFile,
#[error(
"Expected MeiliSearch engine version: {major}.{minor}.{patch}, current engine version: {}. To update MeiliSearch use a dump.",
"Expected Meilisearch engine version: {major}.{minor}.{patch}, current engine version: {}. To update Meilisearch use a dump.",
env!("CARGO_PKG_VERSION").to_string()
)]
VersionMismatch {

View file

@ -12,7 +12,7 @@ static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
// Persists the version of the current MeiliSearch binary to a VERSION file
// Persists the version of the current Meilisearch binary to a VERSION file
pub fn create_version_file(db_path: &Path) -> anyhow::Result<()> {
let version_path = db_path.join(VERSION_FILE_NAME);
fs::write(
@ -23,7 +23,7 @@ pub fn create_version_file(db_path: &Path) -> anyhow::Result<()> {
Ok(())
}
// Ensures MeiliSearch version is compatible with the database, returns an error versions mismatch.
// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch.
pub fn check_version_file(db_path: &Path) -> anyhow::Result<()> {
let version_path = db_path.join(VERSION_FILE_NAME);

View file

@ -2,6 +2,7 @@ use std::collections::HashSet;
use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use heed::types::{SerdeBincode, Str};
use heed::{CompactionOption, Database, Env};
@ -42,12 +43,20 @@ pub struct IndexMeta {
#[derive(Clone)]
pub struct HeedMetaStore {
env: Env,
env: Arc<Env>,
db: Database<Str, SerdeBincode<IndexMeta>>,
}
impl Drop for HeedMetaStore {
fn drop(&mut self) {
if Arc::strong_count(&self.env) == 1 {
self.env.as_ref().clone().prepare_for_closing();
}
}
}
impl HeedMetaStore {
pub fn new(env: heed::Env) -> Result<Self> {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
let db = env.create_database(Some("uuids"))?;
Ok(Self { env, db })
}
@ -144,7 +153,7 @@ impl HeedMetaStore {
Ok(())
}
pub fn load_dump(src: impl AsRef<Path>, env: heed::Env) -> Result<()> {
pub fn load_dump(src: impl AsRef<Path>, env: Arc<heed::Env>) -> Result<()> {
let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl");
let indexes = File::open(&src_indexes)?;
let mut indexes = BufReader::new(indexes);

View file

@ -4,6 +4,7 @@ pub mod meta_store;
use std::convert::TryInto;
use std::path::Path;
use std::sync::Arc;
use chrono::Utc;
use error::{IndexResolverError, Result};
@ -16,13 +17,11 @@ use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
use uuid::Uuid;
use crate::index::update_handler::UpdateHandler;
use crate::index::{error::Result as IndexResult, Index};
use crate::index::{error::Result as IndexResult, update_handler::UpdateHandler, Index};
use crate::options::IndexerOpts;
use crate::tasks::batch::Batch;
use crate::tasks::task::{DocumentDeletion, Job, Task, TaskContent, TaskEvent, TaskId, TaskResult};
use crate::tasks::Pending;
use crate::tasks::TaskPerformer;
use crate::tasks::{Pending, TaskPerformer};
use crate::update_file_store::UpdateFileStore;
use self::meta_store::IndexMeta;
@ -39,7 +38,7 @@ pub fn create_index_resolver(
path: impl AsRef<Path>,
index_size: usize,
indexer_opts: &IndexerOpts,
meta_env: heed::Env,
meta_env: Arc<heed::Env>,
file_store: UpdateFileStore,
) -> anyhow::Result<HardStateIndexResolver> {
let uuid_store = HeedMetaStore::new(meta_env)?;
@ -153,7 +152,7 @@ impl IndexResolver<HeedMetaStore, MapIndexStore> {
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
env: Env,
env: Arc<Env>,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
HeedMetaStore::load_dump(&src, env)?;

View file

@ -10,6 +10,8 @@ mod snapshot;
pub mod tasks;
mod update_file_store;
use std::path::Path;
pub use index_controller::MeiliSearch;
pub use milli;
@ -33,3 +35,19 @@ impl EnvSizer for heed::Env {
.fold(0, |acc, m| acc + m.len())
}
}
/// Check if a db is empty. It does not provide any information on the
/// validity of the data in it.
/// We consider a database as non empty when it's a non empty directory.
pub fn is_empty_db(db_path: impl AsRef<Path>) -> bool {
let db_path = db_path.as_ref();
if !db_path.exists() {
true
// if we encounter an error or if the db is a file we consider the db non empty
} else if let Ok(dir) = db_path.read_dir() {
dir.count() == 0
} else {
true
}
}

View file

@ -2,19 +2,19 @@ use core::fmt;
use std::{ops::Deref, str::FromStr};
use byte_unit::{Byte, ByteError};
use clap::Parser;
use milli::CompressionType;
use structopt::StructOpt;
use sysinfo::{RefreshKind, System, SystemExt};
#[derive(Debug, Clone, StructOpt)]
#[derive(Debug, Clone, Parser)]
pub struct IndexerOpts {
/// The amount of documents to skip before printing
/// a log regarding the indexing advancement.
#[structopt(long, default_value = "100000")] // 100k
#[clap(long, default_value = "100000")] // 100k
pub log_every_n: usize,
/// Grenad max number of chunks in bytes.
#[structopt(long)]
#[clap(long)]
pub max_nb_chunks: Option<usize>,
/// The maximum amount of memory the indexer will use. It defaults to 2/3
@ -24,22 +24,22 @@ pub struct IndexerOpts {
/// In case the engine is unable to retrieve the available memory the engine will
/// try to use the memory it needs but without real limit, this can lead to
/// Out-Of-Memory issues and it is recommended to specify the amount of memory to use.
#[structopt(long, default_value)]
#[clap(long, default_value_t)]
pub max_memory: MaxMemory,
/// The name of the compression algorithm to use when compressing intermediate
/// Grenad chunks while indexing documents.
///
/// Choosing a fast algorithm will make the indexing faster but may consume more memory.
#[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
#[clap(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
pub chunk_compression_type: CompressionType,
/// The level of compression of the chosen algorithm.
#[structopt(long, requires = "chunk-compression-type")]
#[clap(long, requires = "chunk-compression-type")]
pub chunk_compression_level: Option<u32>,
/// Number of parallel jobs for indexing, defaults to # of CPUs.
#[structopt(long)]
#[clap(long)]
pub indexing_jobs: Option<usize>,
}

View file

@ -49,7 +49,10 @@ pub fn load_snapshot(
ignore_snapshot_if_db_exists: bool,
ignore_missing_snapshot: bool,
) -> anyhow::Result<()> {
if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() {
let empty_db = crate::is_empty_db(&db_path);
let snapshot_path_exists = snapshot_path.as_ref().exists();
if empty_db && snapshot_path_exists {
match from_tar_gz(snapshot_path, &db_path) {
Ok(()) => Ok(()),
Err(e) => {
@ -58,7 +61,7 @@ pub fn load_snapshot(
Err(e)
}
}
} else if db_path.as_ref().exists() && !ignore_snapshot_if_db_exists {
} else if !empty_db && !ignore_snapshot_if_db_exists {
bail!(
"database already exists at {:?}, try to delete it or rename it",
db_path
@ -66,14 +69,8 @@ pub fn load_snapshot(
.canonicalize()
.unwrap_or_else(|_| db_path.as_ref().to_owned())
)
} else if !snapshot_path.as_ref().exists() && !ignore_missing_snapshot {
bail!(
"snapshot doesn't exist at {:?}",
snapshot_path
.as_ref()
.canonicalize()
.unwrap_or_else(|_| snapshot_path.as_ref().to_owned())
)
} else if !snapshot_path_exists && !ignore_missing_snapshot {
bail!("snapshot doesn't exist at {:?}", snapshot_path.as_ref())
} else {
Ok(())
}

View file

@ -32,7 +32,7 @@ pub trait TaskPerformer: Sync + Send + 'static {
async fn finish(&self, batch: &Batch);
}
pub fn create_task_store<P>(env: heed::Env, performer: Arc<P>) -> Result<TaskStore>
pub fn create_task_store<P>(env: Arc<heed::Env>, performer: Arc<P>) -> Result<TaskStore>
where
P: TaskPerformer,
{

View file

@ -55,7 +55,7 @@ pub enum TaskEvent {
},
}
/// A task represents an operation that MeiliSearch must do.
/// A task represents an operation that Meilisearch must do.
/// It's stored on disk and executed from the lowest to highest Task id.
/// Everytime a new task is created it has a higher Task id than the previous one.
/// See also `Job`.
@ -91,7 +91,7 @@ impl Task {
/// A job is like a volatile priority `Task`.
/// It should be processed as fast as possible and is not stored on disk.
/// This means, when MeiliSearch is closed all your unprocessed jobs will disappear.
/// This means, when Meilisearch is closed all your unprocessed jobs will disappear.
#[derive(Debug, derivative::Derivative)]
#[derivative(PartialEq)]
pub enum Job {

View file

@ -114,7 +114,7 @@ impl Clone for TaskStore {
}
impl TaskStore {
pub fn new(env: heed::Env) -> Result<Self> {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
let mut store = Store::new(env)?;
let unfinished_tasks = store.reset_and_return_unfinished_tasks()?;
let store = Arc::new(store);
@ -293,7 +293,7 @@ impl TaskStore {
Ok(())
}
pub fn load_dump(src: impl AsRef<Path>, env: Env) -> anyhow::Result<()> {
pub fn load_dump(src: impl AsRef<Path>, env: Arc<Env>) -> anyhow::Result<()> {
// create a dummy update field store, since it is not needed right now.
let store = Self::new(env.clone())?;
@ -340,7 +340,7 @@ pub mod test {
}
impl MockTaskStore {
pub fn new(env: heed::Env) -> Result<Self> {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
Ok(Self::Real(TaskStore::new(env)?))
}
@ -432,7 +432,7 @@ pub mod test {
}
}
pub fn load_dump(path: impl AsRef<Path>, env: Env) -> anyhow::Result<()> {
pub fn load_dump(path: impl AsRef<Path>, env: Arc<Env>) -> anyhow::Result<()> {
TaskStore::load_dump(path, env)
}
}

View file

@ -10,6 +10,7 @@ use std::convert::TryInto;
use std::mem::size_of;
use std::ops::Range;
use std::result::Result as StdResult;
use std::sync::Arc;
use heed::types::{ByteSlice, OwnedType, SerdeJson, Unit};
use heed::{BytesDecode, BytesEncode, Database, Env, RoTxn, RwTxn};
@ -53,18 +54,26 @@ impl<'a> BytesDecode<'a> for IndexUidTaskIdCodec {
}
pub struct Store {
env: Env,
env: Arc<Env>,
uids_task_ids: Database<IndexUidTaskIdCodec, Unit>,
tasks: Database<OwnedType<BEU64>, SerdeJson<Task>>,
}
impl Drop for Store {
fn drop(&mut self) {
if Arc::strong_count(&self.env) == 1 {
self.env.as_ref().clone().prepare_for_closing();
}
}
}
impl Store {
/// Create a new store from the specified `Path`.
/// Be really cautious when calling this function, the returned `Store` may
/// be in an invalid state, with dangling processing tasks.
/// You want to patch all un-finished tasks and put them in your pending
/// queue with the `reset_and_return_unfinished_update` method.
pub fn new(env: heed::Env) -> Result<Self> {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
let uids_task_ids = env.create_database(Some(UID_TASK_IDS))?;
let tasks = env.create_database(Some(TASKS))?;
@ -78,7 +87,7 @@ impl Store {
/// This function should be called *right after* creating the store.
/// It put back all unfinished update in the `Created` state. This
/// allow us to re-enqueue an update that didn't had the time to finish
/// when MeiliSearch closed.
/// when Meilisearch closed.
pub fn reset_and_return_unfinished_tasks(&mut self) -> Result<BinaryHeap<Pending<TaskId>>> {
let mut unfinished_tasks: BinaryHeap<Pending<TaskId>> = BinaryHeap::new();
@ -257,10 +266,10 @@ pub mod test {
Fake(Mocker),
}
pub struct TmpEnv(TempDir, heed::Env);
pub struct TmpEnv(TempDir, Arc<heed::Env>);
impl TmpEnv {
pub fn env(&self) -> heed::Env {
pub fn env(&self) -> Arc<heed::Env> {
self.1.clone()
}
}
@ -271,13 +280,13 @@ pub mod test {
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100000);
options.max_dbs(1000);
let env = options.open(tmp.path()).unwrap();
let env = Arc::new(options.open(tmp.path()).unwrap());
TmpEnv(tmp, env)
}
impl MockStore {
pub fn new(env: heed::Env) -> Result<Self> {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
Ok(Self::Real(Store::new(env)?))
}