mirror of
synced 2025-02-10 12:33:29 +01:00
Remove once for all the meilisearch-lib crate
This commit is contained in:
@ -1,170 +0,0 @@
use std::borrow::Borrow;
use std::fmt::{self, Debug, Display};
use std::io::{self, BufReader, Read, Seek, Write};
use either::Either;
use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::internal_error;
use milli::documents::{DocumentsBatchBuilder, Error};
use milli::Object;
use serde::Deserialize;
use serde_json::error::Category;
type Result<T> = std::result::Result<T, DocumentFormatError>;
pub enum PayloadType {
impl fmt::Display for PayloadType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PayloadType::Ndjson => f.write_str("ndjson"),
PayloadType::Json => f.write_str("json"),
PayloadType::Csv => f.write_str("csv"),
pub enum DocumentFormatError {
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
MalformedPayload(Error, PayloadType),
impl Display for DocumentFormatError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e),
Self::MalformedPayload(me, b) => match me.borrow() {
Error::Json(se) => {
let mut message = match se.classify() {
Category::Data => {
"data are neither an object nor a list of objects".to_string()
_ => se.to_string(),
// https://github.com/meilisearch/meilisearch/issues/2107
// The user input maybe insanely long. We need to truncate it.
let ellipsis = "...";
let trim_input_prefix_len = 50;
let trim_input_suffix_len = 85;
if message.len()
> trim_input_prefix_len + trim_input_suffix_len + ellipsis.len()
trim_input_prefix_len..message.len() - trim_input_suffix_len,
"The `{}` payload provided is malformed. `Couldn't serialize document value: {}`.",
b, message
_ => write!(f, "The `{}` payload provided is malformed: `{}`.", b, me),
impl std::error::Error for DocumentFormatError {}
impl From<(PayloadType, Error)> for DocumentFormatError {
fn from((ty, error): (PayloadType, Error)) -> Self {
match error {
Error::Io(e) => Self::Internal(Box::new(e)),
e => Self::MalformedPayload(e, ty),
impl ErrorCode for DocumentFormatError {
fn error_code(&self) -> Code {
match self {
DocumentFormatError::Internal(_) => Code::Internal,
DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload,
internal_error!(DocumentFormatError: io::Error);
/// Reads CSV from input and write an obkv batch to writer.
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer);
let csv = csv::Reader::from_reader(input);
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
let count = builder.documents_count();
let _ = builder
Ok(count as usize)
/// Reads JSON Lines from input and write an obkv batch to writer.
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer);
let reader = BufReader::new(input);
for result in serde_json::Deserializer::from_reader(reader).into_iter() {
let object = result
.map_err(|e| (PayloadType::Ndjson, e))?;
let count = builder.documents_count();
let _ = builder
Ok(count as usize)
/// Reads JSON from input and write an obkv batch to writer.
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
let mut builder = DocumentsBatchBuilder::new(writer);
let reader = BufReader::new(input);
#[derive(Deserialize, Debug)]
struct ArrayOrSingleObject {
#[serde(with = "either::serde_untagged")]
inner: Either<Vec<Object>, Object>,
let content: ArrayOrSingleObject = serde_json::from_reader(reader)
.map_err(|e| (PayloadType::Json, e))?;
for object in content.inner.map_right(|o| vec![o]).into_inner() {
let count = builder.documents_count();
let _ = builder
Ok(count as usize)
@ -1,152 +0,0 @@
use anyhow::bail;
use meilisearch_types::error::Code;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::index::{Settings, Unchecked};
#[derive(Serialize, Deserialize)]
pub struct UpdateEntry {
pub uuid: Uuid,
pub update: UpdateStatus,
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateFormat {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DocumentAdditionResult {
pub nb_documents: usize,
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult {
DocumentDeletion { deleted: u64 },
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum UpdateMeta {
DocumentsAddition {
method: IndexDocumentsMethod,
format: UpdateFormat,
primary_key: Option<String>,
DeleteDocuments {
ids: Vec<String>,
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Enqueued {
pub update_id: u64,
pub meta: UpdateMeta,
#[serde(with = "time::serde::rfc3339")]
pub enqueued_at: OffsetDateTime,
pub content: Option<Uuid>,
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Processed {
pub success: UpdateResult,
#[serde(with = "time::serde::rfc3339")]
pub processed_at: OffsetDateTime,
pub from: Processing,
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Processing {
pub from: Enqueued,
#[serde(with = "time::serde::rfc3339")]
pub started_processing_at: OffsetDateTime,
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Aborted {
pub from: Enqueued,
#[serde(with = "time::serde::rfc3339")]
pub aborted_at: OffsetDateTime,
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Failed {
pub from: Processing,
pub error: ResponseError,
#[serde(with = "time::serde::rfc3339")]
pub failed_at: OffsetDateTime,
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "camelCase")]
pub enum UpdateStatus {
type StatusCode = ();
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ResponseError {
pub code: StatusCode,
pub message: String,
pub error_code: String,
pub error_type: String,
pub error_link: String,
pub fn error_code_from_str(s: &str) -> anyhow::Result<Code> {
let code = match s {
"index_creation_failed" => Code::CreateIndex,
"index_already_exists" => Code::IndexAlreadyExists,
"index_not_found" => Code::IndexNotFound,
"invalid_index_uid" => Code::InvalidIndexUid,
"invalid_state" => Code::InvalidState,
"missing_primary_key" => Code::MissingPrimaryKey,
"primary_key_already_present" => Code::PrimaryKeyAlreadyPresent,
"invalid_request" => Code::InvalidRankingRule,
"max_fields_limit_exceeded" => Code::MaxFieldsLimitExceeded,
"missing_document_id" => Code::MissingDocumentId,
"invalid_facet" => Code::Filter,
"invalid_filter" => Code::Filter,
"invalid_sort" => Code::Sort,
"bad_parameter" => Code::BadParameter,
"bad_request" => Code::BadRequest,
"document_not_found" => Code::DocumentNotFound,
"internal" => Code::Internal,
"invalid_geo_field" => Code::InvalidGeoField,
"invalid_token" => Code::InvalidToken,
"missing_authorization_header" => Code::MissingAuthorizationHeader,
"payload_too_large" => Code::PayloadTooLarge,
"unretrievable_document" => Code::RetrieveDocument,
"search_error" => Code::SearchDocuments,
"unsupported_media_type" => Code::UnsupportedMediaType,
"dump_already_in_progress" => Code::DumpAlreadyInProgress,
"dump_process_failed" => Code::DumpProcessFailed,
_ => bail!("unknown error code."),
@ -1,103 +0,0 @@
use std::fs::{self, create_dir_all, File};
use std::io::{BufReader, Write};
use std::path::Path;
use fs_extra::dir::{self, CopyOptions};
use log::info;
use serde_json::{Deserializer, Map, Value};
use tempfile::tempdir;
use uuid::Uuid;
use crate::dump::{compat, Metadata};
use crate::options::IndexerOpts;
use crate::tasks::task::Task;
pub fn load_dump(
meta: Metadata,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
meta_env_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
info!("Patching dump V4 to dump V5...");
let patched_dir = tempdir()?;
let options = CopyOptions::default();
// Indexes
dir::copy(src.as_ref().join("indexes"), &patched_dir, &options)?;
// Index uuids
dir::copy(src.as_ref().join("index_uuids"), &patched_dir, &options)?;
// Metadata
// Updates
patch_updates(&src, &patched_dir)?;
// Keys
patch_keys(&src, &patched_dir)?;
fn patch_updates(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let updates_path = src.as_ref().join("updates/data.jsonl");
let output_updates_path = dst.as_ref().join("updates/data.jsonl");
let updates_file = File::open(updates_path)?;
let mut output_update_file = File::create(output_updates_path)?;
.try_for_each(|task| -> anyhow::Result<()> {
let task: Task = task?.into();
serde_json::to_writer(&mut output_update_file, &task)?;
fn patch_keys(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let keys_file_src = src.as_ref().join("keys");
if !keys_file_src.exists() {
return Ok(());
let keys_file_dst = dst.as_ref().join("keys");
let mut writer = File::create(&keys_file_dst)?;
let reader = BufReader::new(File::open(&keys_file_src)?);
for key in Deserializer::from_reader(reader).into_iter() {
let mut key: Map<String, Value> = key?;
// generate a new uuid v4 and insert it in the key.
let uid = serde_json::to_value(Uuid::new_v4()).unwrap();
key.insert("uid".to_string(), uid);
serde_json::to_writer(&mut writer, &key)?;
@ -1,161 +0,0 @@
use std::fs::{create_dir_all, File};
use std::io::{BufReader, Seek, SeekFrom, Write};
use std::path::Path;
use anyhow::Context;
use indexmap::IndexMap;
use milli::documents::DocumentsBatchReader;
use milli::heed::{EnvOpenOptions, RoTxn};
use milli::update::{IndexDocumentsConfig, IndexerConfig};
use serde::{Deserialize, Serialize};
use crate::document_formats::read_ndjson;
use crate::index::updates::apply_settings_to_builder;
use super::error::Result;
use super::{index::Index, Settings, Unchecked};
#[derive(Serialize, Deserialize)]
struct DumpMeta {
settings: Settings<Unchecked>,
primary_key: Option<String>,
const META_FILE_NAME: &str = "meta.json";
const DATA_FILE_NAME: &str = "documents.jsonl";
impl Index {
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
// acquire write txn make sure any ongoing write is finished before we start.
let txn = self.write_txn()?;
let path = path.as_ref().join(format!("indexes/{}", self.uuid));
self.dump_documents(&txn, &path)?;
self.dump_meta(&txn, &path)?;
fn dump_documents(&self, txn: &RoTxn, path: impl AsRef<Path>) -> Result<()> {
let document_file_path = path.as_ref().join(DATA_FILE_NAME);
let mut document_file = File::create(&document_file_path)?;
let documents = self.all_documents(txn)?;
let fields_ids_map = self.fields_ids_map(txn)?;
// dump documents
let mut json_map = IndexMap::new();
for document in documents {
let (_, reader) = document?;
for (fid, bytes) in reader.iter() {
if let Some(name) = fields_ids_map.name(fid) {
json_map.insert(name, serde_json::from_slice::<serde_json::Value>(bytes)?);
serde_json::to_writer(&mut document_file, &json_map)?;
fn dump_meta(&self, txn: &RoTxn, path: impl AsRef<Path>) -> Result<()> {
let meta_file_path = path.as_ref().join(META_FILE_NAME);
let mut meta_file = File::create(&meta_file_path)?;
let settings = self.settings_txn(txn)?.into_unchecked();
let primary_key = self.primary_key(txn)?.map(String::from);
let meta = DumpMeta {
serde_json::to_writer(&mut meta_file, &meta)?;
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
indexer_config: &IndexerConfig,
) -> anyhow::Result<()> {
let dir_name = src
.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?;
let dst_dir_path = dst.as_ref().join("indexes").join(dir_name);
let meta_path = src.as_ref().join(META_FILE_NAME);
let meta_file = File::open(meta_path)?;
let DumpMeta {
} = serde_json::from_reader(meta_file)?;
let settings = settings.check();
let mut options = EnvOpenOptions::new();
let index = milli::Index::new(options, &dst_dir_path)?;
let mut txn = index.write_txn()?;
// Apply settings first
let mut builder = milli::update::Settings::new(&mut txn, &index, indexer_config);
if let Some(primary_key) = primary_key {
apply_settings_to_builder(&settings, &mut builder);
builder.execute(|_| ())?;
let document_file_path = src.as_ref().join(DATA_FILE_NAME);
let reader = BufReader::new(File::open(&document_file_path)?);
let mut tmp_doc_file = tempfile::tempfile()?;
let empty = match read_ndjson(reader, &mut tmp_doc_file) {
// if there was no document in the file it's because the index was empty
Ok(0) => true,
Ok(_) => false,
Err(e) => return Err(e.into()),
if !empty {
let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?;
//If the document file is empty, we don't perform the document addition, to prevent
//a primary key error to be thrown.
let config = IndexDocumentsConfig::default();
let builder = milli::update::IndexDocuments::new(
&mut txn,
|_| (),
let (builder, user_error) = builder.add_documents(documents_reader)?;
@ -1,333 +0,0 @@
use std::collections::BTreeSet;
use std::fs::create_dir_all;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use fst::IntoStreamer;
use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn};
use milli::update::{IndexerConfig, Setting};
use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use time::OffsetDateTime;
use uuid::Uuid;
use walkdir::WalkDir;
use crate::index::search::DEFAULT_PAGINATION_MAX_TOTAL_HITS;
use super::error::IndexError;
use super::error::Result;
use super::updates::{FacetingSettings, MinWordSizeTyposSetting, PaginationSettings, TypoSettings};
use super::{Checked, Settings};
pub type Document = Map<String, Value>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
pub updated_at: OffsetDateTime,
pub primary_key: Option<String>,
impl IndexMeta {
pub fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
pub fn new_txn(index: &Index, txn: &milli::heed::RoTxn) -> Result<Self> {
let created_at = index.created_at(txn)?;
let updated_at = index.updated_at(txn)?;
let primary_key = index.primary_key(txn)?.map(String::from);
Ok(Self {
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct IndexStats {
pub size: u64,
pub number_of_documents: u64,
/// Whether the current index is performing an update. It is initially `None` when the
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
pub is_indexing: Option<bool>,
pub field_distribution: FieldDistribution,
#[derive(Clone, derivative::Derivative)]
pub struct Index {
pub uuid: Uuid,
#[derivative(Debug = "ignore")]
pub inner: Arc<milli::Index>,
#[derivative(Debug = "ignore")]
pub indexer_config: Arc<IndexerConfig>,
impl Deref for Index {
type Target = milli::Index;
fn deref(&self) -> &Self::Target {
impl Index {
pub fn open(
path: impl AsRef<Path>,
size: usize,
uuid: Uuid,
update_handler: Arc<IndexerConfig>,
) -> Result<Self> {
log::debug!("opening index in {}", path.as_ref().display());
let mut options = EnvOpenOptions::new();
let inner = Arc::new(milli::Index::new(options, &path)?);
Ok(Index {
indexer_config: update_handler,
/// Asynchronously close the underlying index
pub fn close(self) {
pub fn stats(&self) -> Result<IndexStats> {
let rtxn = self.read_txn()?;
Ok(IndexStats {
size: self.size(),
number_of_documents: self.number_of_documents(&rtxn)?,
is_indexing: None,
field_distribution: self.field_distribution(&rtxn)?,
pub fn meta(&self) -> Result<IndexMeta> {
pub fn settings(&self) -> Result<Settings<Checked>> {
let txn = self.read_txn()?;
pub fn uuid(&self) -> Uuid {
pub fn settings_txn(&self, txn: &RoTxn) -> Result<Settings<Checked>> {
let displayed_attributes = self
.map(|fields| fields.into_iter().map(String::from).collect());
let searchable_attributes = self
.map(|fields| fields.into_iter().map(String::from).collect());
let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect();
let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect();
let criteria = self
.map(|c| c.to_string())
let stop_words = self
.map(|stop_words| -> Result<BTreeSet<_>> {
let distinct_field = self.distinct_field(txn)?.map(String::from);
// in milli each word in the synonyms map were split on their separator. Since we lost
// this information we are going to put space between words.
let synonyms = self
.map(|(key, values)| {
key.join(" "),
values.iter().map(|value| value.join(" ")).collect(),
let min_typo_word_len = MinWordSizeTyposSetting {
one_typo: Setting::Set(self.min_word_len_one_typo(txn)?),
two_typos: Setting::Set(self.min_word_len_two_typos(txn)?),
let disabled_words = match self.exact_words(txn)? {
Some(fst) => fst.into_stream().into_strs()?.into_iter().collect(),
None => BTreeSet::new(),
let disabled_attributes = self
let typo_tolerance = TypoSettings {
enabled: Setting::Set(self.authorize_typos(txn)?),
min_word_size_for_typos: Setting::Set(min_typo_word_len),
disable_on_words: Setting::Set(disabled_words),
disable_on_attributes: Setting::Set(disabled_attributes),
let faceting = FacetingSettings {
max_values_per_facet: Setting::Set(
let pagination = PaginationSettings {
max_total_hits: Setting::Set(
Ok(Settings {
displayed_attributes: match displayed_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
searchable_attributes: match searchable_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
filterable_attributes: Setting::Set(filterable_attributes),
sortable_attributes: Setting::Set(sortable_attributes),
ranking_rules: Setting::Set(criteria),
stop_words: Setting::Set(stop_words),
distinct_attribute: match distinct_field {
Some(field) => Setting::Set(field),
None => Setting::Reset,
synonyms: Setting::Set(synonyms),
typo_tolerance: Setting::Set(typo_tolerance),
faceting: Setting::Set(faceting),
pagination: Setting::Set(pagination),
_kind: PhantomData,
/// Return the total number of documents contained in the index + the selected documents.
pub fn retrieve_documents<S: AsRef<str>>(
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<(u64, Vec<Document>)> {
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let mut documents = Vec::new();
for entry in self.all_documents(&txn)?.skip(offset).take(limit) {
let (_id, obkv) = entry?;
let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?;
let document = match &attributes_to_retrieve {
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
attributes_to_retrieve.iter().map(|s| s.as_ref()),
None => document,
let number_of_documents = self.number_of_documents(&txn)?;
Ok((number_of_documents, documents))
pub fn retrieve_document<S: AsRef<str>>(
doc_id: String,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Document> {
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let internal_id = self
.ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?;
let document = self
.documents(&txn, std::iter::once(internal_id))?
.map(|(_, d)| d)
let document = obkv_to_json(&all_fields, &fields_ids_map, document)?;
let document = match &attributes_to_retrieve {
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
attributes_to_retrieve.iter().map(|s| s.as_ref()),
None => document,
pub fn size(&self) -> u64 {
.filter_map(|entry| entry.ok())
.filter_map(|entry| entry.metadata().ok())
.filter(|metadata| metadata.is_file())
.fold(0, |acc, m| acc + m.len())
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid));
let _txn = self.write_txn()?;
self.inner.copy_to_path(dst, CompactionOption::Enabled)?;
/// When running tests, when a server instance is dropped, the environment is not actually closed,
/// leaving a lot of open file descriptors.
impl Drop for Index {
fn drop(&mut self) {
// When dropping the last instance of an index, we want to close the index
// Note that the close is actually performed only if all the instances a effectively
// dropped
if Arc::strong_count(&self.inner) == 1 {
@ -1,108 +0,0 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use milli::update::IndexerConfig;
use tokio::fs;
use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::error::{IndexResolverError, Result};
use crate::index::Index;
use crate::options::IndexerOpts;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
#[cfg_attr(test, mockall::automock)]
pub trait IndexStore {
async fn create(&self, uuid: Uuid) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
pub struct MapIndexStore {
index_store: AsyncMap<Uuid, Index>,
path: PathBuf,
index_size: usize,
indexer_config: Arc<IndexerConfig>,
impl MapIndexStore {
pub fn new(
path: impl AsRef<Path>,
index_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<Self> {
let indexer_config = Arc::new(IndexerConfig::try_from(indexer_opts)?);
let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new()));
Ok(Self {
impl IndexStore for MapIndexStore {
async fn create(&self, uuid: Uuid) -> Result<Index> {
// We need to keep the lock until we are sure the db file has been opened correctly, to
// ensure that another db is not created at the same time.
let mut lock = self.index_store.write().await;
if let Some(index) = lock.get(&uuid) {
return Ok(index.clone());
let path = self.path.join(format!("{}", uuid));
if path.exists() {
return Err(IndexResolverError::UuidAlreadyExists(uuid));
let index_size = self.index_size;
let update_handler = self.indexer_config.clone();
let index = spawn_blocking(move || -> Result<Index> {
let index = Index::open(path, index_size, uuid, update_handler)?;
lock.insert(uuid, index.clone());
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
let guard = self.index_store.read().await;
match guard.get(&uuid) {
Some(index) => Ok(Some(index.clone())),
None => {
// drop the guard here so we can perform the write after without deadlocking;
let path = self.path.join(format!("{}", uuid));
if !path.exists() {
return Ok(None);
let index_size = self.index_size;
let update_handler = self.indexer_config.clone();
let index =
spawn_blocking(move || Index::open(path, index_size, uuid, update_handler))
self.index_store.write().await.insert(uuid, index.clone());
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
let db_path = self.path.join(format!("{}", uuid));
let index = self.index_store.write().await.remove(&uuid);
@ -1,50 +0,0 @@
pub mod error;
pub mod options;
mod analytics;
mod document_formats;
// TODO: TAMO: reenable the dumps
mod dump;
mod index_controller;
mod snapshot;
use std::env::VarError;
use std::ffi::OsStr;
use std::path::Path;
// TODO: TAMO: rename the MeiliSearch in Meilisearch
pub use index_controller::error::IndexControllerError;
pub use index_controller::Meilisearch as MeiliSearch;
pub use milli;
pub use milli::heed;
mod compression;
/// Check if a db is empty. It does not provide any information on the
/// validity of the data in it.
/// We consider a database as non empty when it's a non empty directory.
pub fn is_empty_db(db_path: impl AsRef<Path>) -> bool {
let db_path = db_path.as_ref();
if !db_path.exists() {
// if we encounter an error or if the db is a file we consider the db non empty
} else if let Ok(dir) = db_path.read_dir() {
dir.count() == 0
} else {
/// Checks if the key is defined in the environment variables.
/// If not, inserts it with the given value.
pub fn export_to_env_if_not_present<T>(key: &str, value: T)
T: AsRef<OsStr>,
if let Err(VarError::NotPresent) = std::env::var(key) {
std::env::set_var(key, value);
@ -1,205 +0,0 @@
use crate::export_to_env_if_not_present;
use core::fmt;
use std::{convert::TryFrom, num::ParseIntError, ops::Deref, str::FromStr};
use byte_unit::{Byte, ByteError};
use clap::Parser;
use milli::update::IndexerConfig;
use serde::{Deserialize, Serialize};
use sysinfo::{RefreshKind, System, SystemExt};
const DEFAULT_LOG_EVERY_N: usize = 100000;
#[derive(Debug, Clone, Parser, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct IndexerOpts {
/// Sets the amount of documents to skip before printing
/// a log regarding the indexing advancement.
#[serde(skip_serializing, default = "default_log_every_n")]
#[clap(long, default_value_t = default_log_every_n(), hide = true)] // 100k
pub log_every_n: usize,
/// Grenad max number of chunks in bytes.
#[clap(long, hide = true)]
pub max_nb_chunks: Option<usize>,
/// Sets the maximum amount of RAM Meilisearch can use when indexing. By default, Meilisearch
/// uses no more than two thirds of available memory.
#[clap(long, env = MEILI_MAX_INDEXING_MEMORY, default_value_t)]
pub max_indexing_memory: MaxMemory,
/// Sets the maximum number of threads Meilisearch can use during indexation. By default, the
/// indexer avoids using more than half of a machine's total processing units. This ensures
/// Meilisearch is always ready to perform searches, even while you are updating an index.
#[clap(long, env = MEILI_MAX_INDEXING_THREADS, default_value_t)]
pub max_indexing_threads: MaxThreads,
#[derive(Debug, Clone, Parser, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct SchedulerConfig {
/// Deactivates auto-batching when provided.
#[clap(long, env = DISABLE_AUTO_BATCHING)]
pub disable_auto_batching: bool,
impl IndexerOpts {
/// Exports the values to their corresponding env vars if they are not set.
pub fn export_to_env(self) {
let IndexerOpts {
log_every_n: _,
max_nb_chunks: _,
} = self;
if let Some(max_indexing_memory) = max_indexing_memory.0 {
impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error;
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
let thread_pool = rayon::ThreadPoolBuilder::new()
Ok(Self {
log_every_n: Some(other.log_every_n),
max_nb_chunks: other.max_nb_chunks,
max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize),
thread_pool: Some(thread_pool),
max_positions_per_attributes: None,
impl Default for IndexerOpts {
fn default() -> Self {
Self {
log_every_n: 100_000,
max_nb_chunks: None,
max_indexing_memory: MaxMemory::default(),
max_indexing_threads: MaxThreads::default(),
impl SchedulerConfig {
pub fn export_to_env(self) {
let SchedulerConfig {
} = self;
export_to_env_if_not_present(DISABLE_AUTO_BATCHING, disable_auto_batching.to_string());
/// A type used to detect the max memory available and use 2/3 of it.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct MaxMemory(Option<Byte>);
impl FromStr for MaxMemory {
type Err = ByteError;
fn from_str(s: &str) -> Result<MaxMemory, ByteError> {
impl Default for MaxMemory {
fn default() -> MaxMemory {
.map(|bytes| bytes * 2 / 3)
impl fmt::Display for MaxMemory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.0 {
Some(memory) => write!(f, "{}", memory.get_appropriate_unit(true)),
None => f.write_str("unknown"),
impl Deref for MaxMemory {
type Target = Option<Byte>;
fn deref(&self) -> &Self::Target {
impl MaxMemory {
pub fn unlimited() -> Self {
/// Returns the total amount of bytes available or `None` if this system isn't supported.
fn total_memory_bytes() -> Option<u64> {
if System::IS_SUPPORTED {
let memory_kind = RefreshKind::new().with_memory();
let mut system = System::new_with_specifics(memory_kind);
Some(system.total_memory() * 1024) // KiB into bytes
} else {
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct MaxThreads(usize);
impl FromStr for MaxThreads {
type Err = ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
impl Default for MaxThreads {
fn default() -> Self {
MaxThreads(num_cpus::get() / 2)
impl fmt::Display for MaxThreads {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
impl Deref for MaxThreads {
type Target = usize;
fn deref(&self) -> &Self::Target {
fn default_log_every_n() -> usize {
@ -1,204 +0,0 @@
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use fs_extra::dir::{self, CopyOptions};
use log::{info, trace};
use meilisearch_auth::open_auth_store_env;
use milli::heed::CompactionOption;
use tokio::sync::RwLock;
use tokio::time::sleep;
use walkdir::WalkDir;
use crate::compression::from_tar_gz;
use crate::index_controller::open_meta_env;
use crate::index_controller::versioning::VERSION_FILE_NAME;
use index_scheduler::IndexScheduler;
pub struct SnapshotService {
pub(crate) db_path: PathBuf,
pub(crate) snapshot_period: Duration,
pub(crate) snapshot_path: PathBuf,
pub(crate) index_size: usize,
pub(crate) meta_env_size: usize,
pub(crate) scheduler: IndexScheduler,
impl SnapshotService {
pub async fn run(self) {
"Snapshot scheduled every {}s.",
loop {
let snapshot_job = SnapshotJob {
dest_path: self.snapshot_path.clone(),
src_path: self.db_path.clone(),
meta_env_size: self.meta_env_size,
index_size: self.index_size,
// TODO: TAMO: reenable the snapshots
// self.scheduler.write().await.schedule_snapshot(snapshot_job);
pub fn load_snapshot(
db_path: impl AsRef<Path>,
snapshot_path: impl AsRef<Path>,
ignore_snapshot_if_db_exists: bool,
ignore_missing_snapshot: bool,
) -> anyhow::Result<()> {
let empty_db = crate::is_empty_db(&db_path);
let snapshot_path_exists = snapshot_path.as_ref().exists();
if empty_db && snapshot_path_exists {
match from_tar_gz(snapshot_path, &db_path) {
Ok(()) => Ok(()),
Err(e) => {
//clean created db folder
} else if !empty_db && !ignore_snapshot_if_db_exists {
"database already exists at {:?}, try to delete it or rename it",
.unwrap_or_else(|_| db_path.as_ref().to_owned())
} else if !snapshot_path_exists && !ignore_missing_snapshot {
bail!("snapshot doesn't exist at {:?}", snapshot_path.as_ref())
} else {
pub struct SnapshotJob {
dest_path: PathBuf,
src_path: PathBuf,
meta_env_size: usize,
index_size: usize,
impl SnapshotJob {
pub async fn run(self) -> anyhow::Result<()> {
tokio::task::spawn_blocking(|| self.run_sync()).await??;
fn run_sync(self) -> anyhow::Result<()> {
trace!("Performing snapshot.");
let snapshot_dir = self.dest_path.clone();
let temp_snapshot_dir = tempfile::tempdir()?;
let temp_snapshot_path = temp_snapshot_dir.path();
let db_name = self
.and_then(|n| n.to_str())
let snapshot_path = self.dest_path.join(format!("{}.snapshot", db_name));
let temp_snapshot_file = tempfile::NamedTempFile::new_in(&snapshot_dir)?;
let temp_snapshot_file_path = temp_snapshot_file.path().to_owned();
crate::compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?;
let _file = temp_snapshot_file.persist(&snapshot_path)?;
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
let perm = Permissions::from_mode(0o644);
trace!("Created snapshot in {:?}.", snapshot_path);
fn snapshot_version_file(&self, path: &Path) -> anyhow::Result<()> {
let dst = path.join(VERSION_FILE_NAME);
let src = self.src_path.join(VERSION_FILE_NAME);
fs::copy(src, dst)?;
fn snapshot_meta_env(&self, path: &Path) -> anyhow::Result<()> {
let env = open_meta_env(&self.src_path, self.meta_env_size)?;
let dst = path.join("data.mdb");
env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
fn snapshot_file_store(&self, path: &Path) -> anyhow::Result<()> {
// for now we simply copy the updates/updates_files
// FIXME(marin): We may copy more files than necessary, if new files are added while we are
// performing the snapshop. We need a way to filter them out.
let dst = path.join("updates");
let options = CopyOptions::default();
dir::copy(self.src_path.join("updates/updates_files"), dst, &options)?;
fn snapshot_indexes(&self, path: &Path) -> anyhow::Result<()> {
let indexes_path = self.src_path.join("indexes/");
let dst = path.join("indexes/");
for entry in WalkDir::new(indexes_path).max_depth(1).into_iter().skip(1) {
let entry = entry?;
let name = entry.file_name();
let dst = dst.join(name);
let dst = dst.join("data.mdb");
let mut options = milli::heed::EnvOpenOptions::new();
let index = milli::Index::new(options, entry.path())?;
index.copy_to_path(dst, CompactionOption::Enabled)?;
fn snapshot_auth(&self, path: &Path) -> anyhow::Result<()> {
let auth_path = self.src_path.join("auth");
let dst = path.join("auth");
let dst = dst.join("data.mdb");
let env = open_auth_store_env(&auth_path)?;
env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
@ -1,420 +0,0 @@
mod store;
use std::collections::HashSet;
use std::io::{BufWriter, Write};
use std::path::Path;
use std::sync::Arc;
use log::debug;
use milli::heed::{Env, RwTxn};
use time::OffsetDateTime;
use super::batch::BatchContent;
use super::error::TaskError;
use super::scheduler::Processing;
use super::task::{Task, TaskContent, TaskId};
use super::Result;
use crate::tasks::task::TaskEvent;
use crate::update_file_store::UpdateFileStore;
pub use store::test::MockStore as Store;
pub use store::Store;
type FilterFn = Box<dyn Fn(&Task) -> bool + Sync + Send + 'static>;
/// Defines constraints to be applied when querying for Tasks from the store.
pub struct TaskFilter {
indexes: Option<HashSet<String>>,
filter_fn: Option<FilterFn>,
impl TaskFilter {
fn pass(&self, task: &Task) -> bool {
match task.index_uid() {
Some(index_uid) => self
.map_or(true, |indexes| indexes.contains(index_uid)),
None => false,
fn filtered_indexes(&self) -> Option<&HashSet<String>> {
/// Adds an index to the filter, so the filter must match this index.
pub fn filter_index(&mut self, index: String) {
pub fn filter_fn(&mut self, f: FilterFn) {
pub struct TaskStore {
store: Arc<Store>,
impl Clone for TaskStore {
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
impl TaskStore {
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
let store = Arc::new(Store::new(env)?);
Ok(Self { store })
pub async fn register(&self, content: TaskContent) -> Result<Task> {
debug!("registering update: {:?}", content);
let store = self.store.clone();
let task = tokio::task::spawn_blocking(move || -> Result<Task> {
let mut txn = store.wtxn()?;
let next_task_id = store.next_task_id(&mut txn)?;
let created_at = TaskEvent::Created(OffsetDateTime::now_utc());
let task = Task {
id: next_task_id,
events: vec![created_at],
store.put(&mut txn, &task)?;
pub fn register_raw_update(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
self.store.put(wtxn, task)?;
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
let store = self.store.clone();
let task = tokio::task::spawn_blocking(move || -> Result<_> {
let txn = store.rtxn()?;
let task = store.get(&txn, id)?;
match filter {
Some(filter) => filter
None => Ok(task),
/// This methods takes a `Processing` which contains the next task ids to process, and returns
/// the corresponding tasks along with the ownership to the passed processing.
/// We need get_processing_tasks to take ownership over `Processing` because we need it to be
/// valid for 'static.
pub async fn get_processing_tasks(
processing: Processing,
) -> Result<(Processing, BatchContent)> {
let store = self.store.clone();
let tasks = tokio::task::spawn_blocking(move || -> Result<_> {
let txn = store.rtxn()?;
let content = match processing {
Processing::DocumentAdditions(ref ids) => {
let mut tasks = Vec::new();
for id in ids.iter() {
let task = store
.get(&txn, *id)?
Processing::IndexUpdate(id) => {
let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?;
Processing::Dump(id) => {
let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?;
debug_assert!(matches!(task.content, TaskContent::Dump { .. }));
Processing::Nothing => BatchContent::Empty,
Ok((processing, content))
pub async fn update_tasks(&self, tasks: Vec<Task>) -> Result<Vec<Task>> {
let store = self.store.clone();
let tasks = tokio::task::spawn_blocking(move || -> Result<_> {
let mut txn = store.wtxn()?;
for task in &tasks {
store.put(&mut txn, task)?;
pub async fn fetch_unfinished_tasks(&self, offset: Option<TaskId>) -> Result<Vec<Task>> {
let store = self.store.clone();
tokio::task::spawn_blocking(move || {
let txn = store.rtxn()?;
let tasks = store.fetch_unfinished_tasks(&txn, offset)?;
pub async fn list_tasks(
offset: Option<TaskId>,
filter: Option<TaskFilter>,
limit: Option<usize>,
) -> Result<Vec<Task>> {
let store = self.store.clone();
tokio::task::spawn_blocking(move || {
let txn = store.rtxn()?;
let tasks = store.list_tasks(&txn, offset, filter, limit)?;
pub async fn dump(
env: Arc<Env>,
dir_path: impl AsRef<Path>,
update_file_store: UpdateFileStore,
) -> Result<()> {
let store = Self::new(env)?;
let update_dir = dir_path.as_ref().join("updates");
let updates_file = update_dir.join("data.jsonl");
let tasks = store.list_tasks(None, None, None).await?;
let dir_path = dir_path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || -> Result<()> {
let updates_file = std::fs::File::create(updates_file)?;
let mut updates_file = BufWriter::new(updates_file);
for task in tasks {
serde_json::to_writer(&mut updates_file, &task)?;
if !task.is_finished() {
if let Some(content_uuid) = task.get_content_uuid() {
update_file_store.dump(content_uuid, &dir_path)?;
pub fn load_dump(src: impl AsRef<Path>, env: Arc<Env>) -> anyhow::Result<()> {
// create a dummy update field store, since it is not needed right now.
let store = Self::new(env.clone())?;
let src_update_path = src.as_ref().join("updates");
let update_data = std::fs::File::open(&src_update_path.join("data.jsonl"))?;
let update_data = std::io::BufReader::new(update_data);
let stream = serde_json::Deserializer::from_reader(update_data).into_iter::<Task>();
let mut wtxn = env.write_txn()?;
for entry in stream {
store.register_raw_update(&mut wtxn, &entry?)?;
pub mod test {
use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env};
use super::*;
use meilisearch_types::index_uid::IndexUid;
use nelson::Mocker;
use proptest::{
test_runner::{Config, TestRunner},
pub enum MockTaskStore {
impl Clone for MockTaskStore {
fn clone(&self) -> Self {
match self {
Self::Real(x) => Self::Real(x.clone()),
Self::Mock(x) => Self::Mock(x.clone()),
impl MockTaskStore {
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
pub async fn dump(
env: Arc<milli::heed::Env>,
path: impl AsRef<Path>,
update_file_store: UpdateFileStore,
) -> Result<()> {
TaskStore::dump(env, path, update_file_store).await
pub fn mock(mocker: Mocker) -> Self {
pub async fn update_tasks(&self, tasks: Vec<Task>) -> Result<Vec<Task>> {
match self {
Self::Real(s) => s.update_tasks(tasks).await,
Self::Mock(m) => unsafe {
m.get::<_, Result<Vec<Task>>>("update_tasks").call(tasks)
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
match self {
Self::Real(s) => s.get_task(id, filter).await,
Self::Mock(m) => unsafe { m.get::<_, Result<Task>>("get_task").call((id, filter)) },
pub async fn get_processing_tasks(
tasks: Processing,
) -> Result<(Processing, BatchContent)> {
match self {
Self::Real(s) => s.get_processing_tasks(tasks).await,
Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) },
pub async fn fetch_unfinished_tasks(&self, from: Option<TaskId>) -> Result<Vec<Task>> {
match self {
Self::Real(s) => s.fetch_unfinished_tasks(from).await,
Self::Mock(m) => unsafe { m.get("fetch_unfinished_tasks").call(from) },
pub async fn list_tasks(
from: Option<TaskId>,
filter: Option<TaskFilter>,
limit: Option<usize>,
) -> Result<Vec<Task>> {
match self {
Self::Real(s) => s.list_tasks(from, filter, limit).await,
Self::Mock(m) => unsafe { m.get("list_tasks").call((from, filter, limit)) },
pub async fn register(&self, content: TaskContent) -> Result<Task> {
match self {
Self::Real(s) => s.register(content).await,
Self::Mock(_m) => todo!(),
pub fn register_raw_update(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
match self {
Self::Real(s) => s.register_raw_update(wtxn, task),
Self::Mock(_m) => todo!(),
pub fn load_dump(path: impl AsRef<Path>, env: Arc<Env>) -> anyhow::Result<()> {
TaskStore::load_dump(path, env)
fn test_increment_task_id() {
let tmp = tmp_env();
let store = Store::new(tmp.env()).unwrap();
let mut txn = store.wtxn().unwrap();
assert_eq!(store.next_task_id(&mut txn).unwrap(), 0);
let gen_task = |id: TaskId| Task {
content: TaskContent::IndexCreation {
primary_key: None,
index_uid: IndexUid::new_unchecked("test"),
events: Vec::new(),
let mut runner = TestRunner::new(Config::default());
.run(&(0..100u32).prop_map(gen_task), |task| {
let mut txn = store.wtxn().unwrap();
let previous_id = store.next_task_id(&mut txn).unwrap();
store.put(&mut txn, &task).unwrap();
let next_id = store.next_task_id(&mut txn).unwrap();
// if we put a task whose task_id is less than the next_id, then the next_id remains
// unchanged, otherwise it becomes task.id + 1
if task.id < previous_id {
assert_eq!(next_id, previous_id)
} else {
assert_eq!(next_id, task.id + 1);
@ -1,377 +0,0 @@
type BEU32 = milli::heed::zerocopy::U32<milli::heed::byteorder::BE>;
const INDEX_UIDS_TASK_IDS: &str = "index-uids-task-ids";
const TASKS: &str = "tasks";
use std::collections::HashSet;
use std::ops::Bound::{Excluded, Unbounded};
use std::result::Result as StdResult;
use std::sync::Arc;
use milli::heed::types::{OwnedType, SerdeJson, Str};
use milli::heed::{Database, Env, RoTxn, RwTxn};
use milli::heed_codec::RoaringBitmapCodec;
use roaring::RoaringBitmap;
use crate::tasks::task::{Task, TaskId};
use super::super::Result;
use super::TaskFilter;
pub struct Store {
env: Arc<Env>,
/// Maps an index uid to the set of tasks ids associated to it.
index_uid_task_ids: Database<Str, RoaringBitmapCodec>,
tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>,
impl Drop for Store {
fn drop(&mut self) {
if Arc::strong_count(&self.env) == 1 {
impl Store {
/// Create a new store from the specified `Path`.
/// Be really cautious when calling this function, the returned `Store` may
/// be in an invalid state, with dangling processing tasks.
/// You want to patch all un-finished tasks and put them in your pending
/// queue with the `reset_and_return_unfinished_update` method.
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
let index_uid_task_ids = env.create_database(Some(INDEX_UIDS_TASK_IDS))?;
let tasks = env.create_database(Some(TASKS))?;
Ok(Self {
pub fn wtxn(&self) -> Result<RwTxn> {
pub fn rtxn(&self) -> Result<RoTxn> {
/// Returns the id for the next task.
/// The required `mut txn` acts as a reservation system. It guarantees that as long as you commit
/// the task to the store in the same transaction, no one else will have this task id.
pub fn next_task_id(&self, txn: &mut RwTxn) -> Result<TaskId> {
let id = self
.map(|(id, _)| id.get() + 1)
pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> {
self.tasks.put(txn, &BEU32::new(task.id), task)?;
// only add the task to the indexes index if it has an index_uid
if let Some(index_uid) = task.index_uid() {
let mut tasks_set = self
.get(txn, index_uid)?
self.index_uid_task_ids.put(txn, index_uid, &tasks_set)?;
pub fn get(&self, txn: &RoTxn, id: TaskId) -> Result<Option<Task>> {
let task = self.tasks.get(txn, &BEU32::new(id))?;
/// Returns the unfinished tasks starting from the given taskId in ascending order.
pub fn fetch_unfinished_tasks(&self, txn: &RoTxn, from: Option<TaskId>) -> Result<Vec<Task>> {
// We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file.
// TODO(marin): This may create some latency when the first batch lazy loads the pending updates.
let from = from.unwrap_or_default();
let result: StdResult<Vec<_>, milli::heed::Error> = self
.range(txn, &(BEU32::new(from)..))?
.map(|r| r.map(|(_, t)| t))
.filter(|result| result.as_ref().map_or(true, |t| !t.is_finished()))
/// Returns all the tasks starting from the given taskId and going in descending order.
pub fn list_tasks(
txn: &RoTxn,
from: Option<TaskId>,
filter: Option<TaskFilter>,
limit: Option<usize>,
) -> Result<Vec<Task>> {
let from = match from {
Some(from) => from,
None => self.tasks.last(txn)?.map_or(0, |(id, _)| id.get()),
let filter_fn = |task: &Task| {
.and_then(|f| f.filter_fn.as_ref())
.map_or(true, |f| f(task))
let result: Result<Vec<_>> = match filter.as_ref().and_then(|f| f.filtered_indexes()) {
Some(indexes) => self
.compute_candidates(txn, indexes, from)?
.filter(|result| result.as_ref().map_or(true, filter_fn))
None => self
.rev_range(txn, &(..=BEU32::new(from)))?
.map(|r| r.map(|(_, t)| t).map_err(Into::into))
.filter(|result| result.as_ref().map_or(true, filter_fn))
fn compute_candidates<'a>(
&'a self,
txn: &'a RoTxn,
indexes: &HashSet<String>,
from: TaskId,
) -> Result<impl Iterator<Item = Result<Task>> + 'a> {
let mut candidates = RoaringBitmap::new();
for index_uid in indexes {
if let Some(tasks_set) = self.index_uid_task_ids.get(txn, index_uid)? {
candidates |= tasks_set;
candidates.remove_range((Excluded(from), Unbounded));
let iter = candidates
.filter_map(|id| self.get(txn, id).transpose());
pub mod test {
use itertools::Itertools;
use meilisearch_types::index_uid::IndexUid;
use milli::heed::EnvOpenOptions;
use nelson::Mocker;
use tempfile::TempDir;
use crate::tasks::task::TaskContent;
use super::*;
/// TODO: use this mock to test the task store properly.
pub enum MockStore {
pub struct TmpEnv(TempDir, Arc<milli::heed::Env>);
impl TmpEnv {
pub fn env(&self) -> Arc<milli::heed::Env> {
pub fn tmp_env() -> TmpEnv {
let tmp = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100000);
let env = Arc::new(options.open(tmp.path()).unwrap());
TmpEnv(tmp, env)
impl MockStore {
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
pub fn wtxn(&self) -> Result<RwTxn> {
match self {
MockStore::Real(index) => index.wtxn(),
MockStore::Fake(_) => todo!(),
pub fn rtxn(&self) -> Result<RoTxn> {
match self {
MockStore::Real(index) => index.rtxn(),
MockStore::Fake(_) => todo!(),
pub fn next_task_id(&self, txn: &mut RwTxn) -> Result<TaskId> {
match self {
MockStore::Real(index) => index.next_task_id(txn),
MockStore::Fake(_) => todo!(),
pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> {
match self {
MockStore::Real(index) => index.put(txn, task),
MockStore::Fake(_) => todo!(),
pub fn get(&self, txn: &RoTxn, id: TaskId) -> Result<Option<Task>> {
match self {
MockStore::Real(index) => index.get(txn, id),
MockStore::Fake(_) => todo!(),
pub fn fetch_unfinished_tasks(
txn: &RoTxn,
from: Option<TaskId>,
) -> Result<Vec<Task>> {
match self {
MockStore::Real(index) => index.fetch_unfinished_tasks(txn, from),
MockStore::Fake(_) => todo!(),
pub fn list_tasks(
txn: &RoTxn,
from: Option<TaskId>,
filter: Option<TaskFilter>,
limit: Option<usize>,
) -> Result<Vec<Task>> {
match self {
MockStore::Real(index) => index.list_tasks(txn, from, filter, limit),
MockStore::Fake(_) => todo!(),
fn test_ordered_filtered_updates() {
let tmp = tmp_env();
let store = Store::new(tmp.env()).unwrap();
let tasks = (0..100)
.map(|_| Task {
id: rand::random(),
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
events: vec![],
let mut txn = store.env.write_txn().unwrap();
.try_for_each(|t| store.put(&mut txn, t))
let mut filter = TaskFilter::default();
let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap();
.map(|t| t.id)
.all(|(a, b)| a > b));
fn test_filter_same_index_prefix() {
let tmp = tmp_env();
let store = Store::new(tmp.env()).unwrap();
let task_1 = Task {
id: 1,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
events: vec![],
let task_2 = Task {
id: 0,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test1"),
events: vec![],
let mut txn = store.wtxn().unwrap();
store.put(&mut txn, &task_1).unwrap();
store.put(&mut txn, &task_2).unwrap();
let mut filter = TaskFilter::default();
let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks.first().as_ref().unwrap().index_uid().unwrap(), "test");
// same thing but invert the ids
let task_1 = Task {
id: 0,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
events: vec![],
let task_2 = Task {
id: 1,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test1"),
events: vec![],
let mut txn = store.wtxn().unwrap();
store.put(&mut txn, &task_1).unwrap();
store.put(&mut txn, &task_2).unwrap();
let mut filter = TaskFilter::default();
let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks.first().as_ref().unwrap().index_uid().unwrap(), "test");
Reference in New Issue
Block a user