Merge branch 'main' into stable

This commit is contained in:
Clémentine Urquizar - curqui 2022-10-04 14:20:10 +02:00 committed by GitHub
commit a7d2c9572e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 824 additions and 139 deletions

View file

@ -8,6 +8,7 @@ use meilisearch_types::internal_error;
use milli::documents::{DocumentsBatchBuilder, Error};
use milli::Object;
use serde::Deserialize;
use serde_json::error::Category;
type Result<T> = std::result::Result<T, DocumentFormatError>;
@ -40,18 +41,32 @@ impl Display for DocumentFormatError {
Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e),
Self::MalformedPayload(me, b) => match me.borrow() {
Error::Json(se) => {
let mut message = match se.classify() {
Category::Data => {
"data are neither an object nor a list of objects".to_string()
}
_ => se.to_string(),
};
// https://github.com/meilisearch/meilisearch/issues/2107
// The user input maybe insanely long. We need to truncate it.
let mut serde_msg = se.to_string();
let ellipsis = "...";
if serde_msg.len() > 100 + ellipsis.len() {
serde_msg.replace_range(50..serde_msg.len() - 85, ellipsis);
let trim_input_prefix_len = 50;
let trim_input_suffix_len = 85;
if message.len()
> trim_input_prefix_len + trim_input_suffix_len + ellipsis.len()
{
message.replace_range(
trim_input_prefix_len..message.len() - trim_input_suffix_len,
ellipsis,
);
}
write!(
f,
"The `{}` payload provided is malformed. `Couldn't serialize document value: {}`.",
b, serde_msg
b, message
)
}
_ => write!(f, "The `{}` payload provided is malformed: `{}`.", b, me),

View file

@ -145,7 +145,7 @@ pub fn error_code_from_str(s: &str) -> anyhow::Result<Code> {
"unsupported_media_type" => Code::UnsupportedMediaType,
"dump_already_in_progress" => Code::DumpAlreadyInProgress,
"dump_process_failed" => Code::DumpProcessFailed,
_ => bail!("unknow error code."),
_ => bail!("unknown error code."),
};
Ok(code)

View file

@ -1,24 +0,0 @@
use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::index_controller::IndexMetadata;
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV1 {
pub db_version: String,
indexes: Vec<IndexMetadata>,
}
impl MetadataV1 {
#[allow(dead_code, unreachable_code, unused_variables)]
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
indexer_options: &IndexerOpts,
) -> anyhow::Result<()> {
anyhow::bail!("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.")
}

View file

@ -57,10 +57,10 @@ fn patch_updates(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result
let updates_path = src.as_ref().join("updates/data.jsonl");
let output_updates_path = dst.as_ref().join("updates/data.jsonl");
create_dir_all(output_updates_path.parent().unwrap())?;
let udpates_file = File::open(updates_path)?;
let updates_file = File::open(updates_path)?;
let mut output_update_file = File::create(output_updates_path)?;
serde_json::Deserializer::from_reader(udpates_file)
serde_json::Deserializer::from_reader(updates_file)
.into_iter::<compat::v4::Task>()
.try_for_each(|task| -> anyhow::Result<()> {
let task: Task = task?.into();

View file

@ -105,6 +105,7 @@ impl Index {
let mut options = EnvOpenOptions::new();
options.map_size(size);
options.max_readers(1024);
let index = milli::Index::new(options, &dst_dir_path)?;
let mut txn = index.write_txn()?;

View file

@ -94,6 +94,7 @@ impl Index {
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
options.max_readers(1024);
let inner = Arc::new(milli::Index::new(options, &path)?);
Ok(Index {
inner,

View file

@ -27,7 +27,7 @@ pub const DEFAULT_CROP_MARKER: fn() -> String = || "…".to_string();
pub const DEFAULT_HIGHLIGHT_PRE_TAG: fn() -> String = || "<em>".to_string();
pub const DEFAULT_HIGHLIGHT_POST_TAG: fn() -> String = || "</em>".to_string();
/// The maximimum number of results that the engine
/// The maximum number of results that the engine
/// will be able to return in one search call.
pub const DEFAULT_PAGINATION_MAX_TOTAL_HITS: usize = 1000;

View file

@ -51,7 +51,7 @@ impl MapIndexStore {
#[async_trait::async_trait]
impl IndexStore for MapIndexStore {
async fn create(&self, uuid: Uuid) -> Result<Index> {
// We need to keep the lock until we are sure the db file has been opened correclty, to
// We need to keep the lock until we are sure the db file has been opened correctly, to
// ensure that another db is not created at the same time.
let mut lock = self.index_store.write().await;

View file

@ -11,6 +11,8 @@ mod snapshot;
pub mod tasks;
mod update_file_store;
use std::env::VarError;
use std::ffi::OsStr;
use std::path::Path;
pub use index_controller::MeiliSearch;
@ -35,3 +37,14 @@ pub fn is_empty_db(db_path: impl AsRef<Path>) -> bool {
true
}
}
/// Checks if the key is defined in the environment variables.
/// If not, inserts it with the given value.
pub fn export_to_env_if_not_present<T>(key: &str, value: T)
where
T: AsRef<OsStr>,
{
if let Err(VarError::NotPresent) = std::env::var(key) {
std::env::set_var(key, value);
}
}

View file

@ -1,33 +1,40 @@
use crate::export_to_env_if_not_present;
use core::fmt;
use std::{convert::TryFrom, num::ParseIntError, ops::Deref, str::FromStr};
use byte_unit::{Byte, ByteError};
use clap::Parser;
use milli::update::IndexerConfig;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use sysinfo::{RefreshKind, System, SystemExt};
#[derive(Debug, Clone, Parser, Serialize)]
const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY";
const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
const DISABLE_AUTO_BATCHING: &str = "DISABLE_AUTO_BATCHING";
const DEFAULT_LOG_EVERY_N: usize = 100000;
#[derive(Debug, Clone, Parser, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct IndexerOpts {
/// The amount of documents to skip before printing
/// a log regarding the indexing advancement.
#[serde(skip)]
#[clap(long, default_value = "100000", hide = true)] // 100k
#[serde(skip_serializing, default = "default_log_every_n")]
#[clap(long, default_value_t = default_log_every_n(), hide = true)] // 100k
pub log_every_n: usize,
/// Grenad max number of chunks in bytes.
#[serde(skip)]
#[serde(skip_serializing)]
#[clap(long, hide = true)]
pub max_nb_chunks: Option<usize>,
/// The maximum amount of memory the indexer will use. It defaults to 2/3
/// of the available memory. It is recommended to use something like 80%-90%
/// of the available memory, no more.
/// The maximum amount of memory the indexer will use.
///
/// In case the engine is unable to retrieve the available memory the engine will
/// try to use the memory it needs but without real limit, this can lead to
/// Out-Of-Memory issues and it is recommended to specify the amount of memory to use.
#[clap(long, env = "MEILI_MAX_INDEXING_MEMORY", default_value_t)]
#[clap(long, env = MEILI_MAX_INDEXING_MEMORY, default_value_t)]
#[serde(default)]
pub max_indexing_memory: MaxMemory,
/// The maximum number of threads the indexer will use.
@ -35,18 +42,43 @@ pub struct IndexerOpts {
/// it will use the maximum number of available cores.
///
/// It defaults to half of the available threads.
#[clap(long, env = "MEILI_MAX_INDEXING_THREADS", default_value_t)]
#[clap(long, env = MEILI_MAX_INDEXING_THREADS, default_value_t)]
#[serde(default)]
pub max_indexing_threads: MaxThreads,
}
#[derive(Debug, Clone, Parser, Default, Serialize)]
#[derive(Debug, Clone, Parser, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct SchedulerConfig {
/// The engine will disable task auto-batching,
/// and will sequencialy compute each task one by one.
#[clap(long, env = "DISABLE_AUTO_BATCHING")]
#[clap(long, env = DISABLE_AUTO_BATCHING)]
#[serde(default)]
pub disable_auto_batching: bool,
}
impl IndexerOpts {
/// Exports the values to their corresponding env vars if they are not set.
pub fn export_to_env(self) {
let IndexerOpts {
max_indexing_memory,
max_indexing_threads,
log_every_n: _,
max_nb_chunks: _,
} = self;
if let Some(max_indexing_memory) = max_indexing_memory.0 {
export_to_env_if_not_present(
MEILI_MAX_INDEXING_MEMORY,
max_indexing_memory.to_string(),
);
}
export_to_env_if_not_present(
MEILI_MAX_INDEXING_THREADS,
max_indexing_threads.0.to_string(),
);
}
}
impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error;
@ -77,8 +109,17 @@ impl Default for IndexerOpts {
}
}
impl SchedulerConfig {
pub fn export_to_env(self) {
let SchedulerConfig {
disable_auto_batching,
} = self;
export_to_env_if_not_present(DISABLE_AUTO_BATCHING, disable_auto_batching.to_string());
}
}
/// A type used to detect the max memory available and use 2/3 of it.
#[derive(Debug, Clone, Copy, Serialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct MaxMemory(Option<Byte>);
impl FromStr for MaxMemory {
@ -134,7 +175,7 @@ fn total_memory_bytes() -> Option<u64> {
}
}
#[derive(Debug, Clone, Copy, Serialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct MaxThreads(usize);
impl FromStr for MaxThreads {
@ -164,3 +205,7 @@ impl Deref for MaxThreads {
&self.0
}
}
fn default_log_every_n() -> usize {
DEFAULT_LOG_EVERY_N
}

View file

@ -181,6 +181,7 @@ impl SnapshotJob {
let mut options = milli::heed::EnvOpenOptions::new();
options.map_size(self.index_size);
options.max_readers(1024);
let index = milli::Index::new(options, entry.path())?;
index.copy_to_path(dst, CompactionOption::Enabled)?;
}

View file

@ -117,7 +117,7 @@ impl TaskStore {
match filter {
Some(filter) => filter
.pass(&task)
.then(|| task)
.then_some(task)
.ok_or(TaskError::UnexistingTask(id)),
None => Ok(task),
}

View file

@ -63,7 +63,7 @@ impl Store {
/// Returns the id for the next task.
///
/// The required `mut txn` acts as a reservation system. It guarantees that as long as you commit
/// the task to the store in the same transaction, no one else will hav this task id.
/// the task to the store in the same transaction, no one else will have this task id.
pub fn next_task_id(&self, txn: &mut RwTxn) -> Result<TaskId> {
let id = self
.tasks