Merge pull request #5670 from meilisearch/export-and-transfer-route

Introduce a new route to export indexes
This commit is contained in:
Mubelotix 2025-07-01 14:37:02 +00:00 committed by GitHub
commit 8fef48f8ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 990 additions and 27 deletions

2
Cargo.lock generated
View File

@ -2997,6 +2997,7 @@ name = "index-scheduler"
version = "1.15.2" version = "1.15.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"backoff",
"big_s", "big_s",
"bincode", "bincode",
"bumpalo", "bumpalo",
@ -3854,6 +3855,7 @@ dependencies = [
"anyhow", "anyhow",
"bumpalo", "bumpalo",
"bumparaw-collections", "bumparaw-collections",
"byte-unit",
"convert_case 0.8.0", "convert_case 0.8.0",
"csv", "csv",
"deserr", "deserr",

View File

@ -1,12 +1,17 @@
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
#![allow(clippy::wrong_self_convention)] #![allow(clippy::wrong_self_convention)]
use std::collections::BTreeMap;
use meilisearch_types::batches::BatchId; use meilisearch_types::batches::BatchId;
use meilisearch_types::byte_unit::Byte;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use meilisearch_types::keys::Key; use meilisearch_types::keys::Key;
use meilisearch_types::milli::update::IndexDocumentsMethod; use meilisearch_types::milli::update::IndexDocumentsMethod;
use meilisearch_types::settings::Unchecked; use meilisearch_types::settings::Unchecked;
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task, TaskId}; use meilisearch_types::tasks::{
Details, ExportIndexSettings, IndexSwap, KindWithContent, Status, Task, TaskId,
};
use meilisearch_types::InstanceUid; use meilisearch_types::InstanceUid;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -141,6 +146,12 @@ pub enum KindDump {
instance_uid: Option<InstanceUid>, instance_uid: Option<InstanceUid>,
}, },
SnapshotCreation, SnapshotCreation,
Export {
url: String,
api_key: Option<String>,
payload_size: Option<Byte>,
indexes: BTreeMap<String, ExportIndexSettings>,
},
UpgradeDatabase { UpgradeDatabase {
from: (u32, u32, u32), from: (u32, u32, u32),
}, },
@ -213,6 +224,15 @@ impl From<KindWithContent> for KindDump {
KindDump::DumpCreation { keys, instance_uid } KindDump::DumpCreation { keys, instance_uid }
} }
KindWithContent::SnapshotCreation => KindDump::SnapshotCreation, KindWithContent::SnapshotCreation => KindDump::SnapshotCreation,
KindWithContent::Export { url, api_key, payload_size, indexes } => KindDump::Export {
url,
api_key,
payload_size,
indexes: indexes
.into_iter()
.map(|(pattern, settings)| (pattern.to_string(), settings))
.collect(),
},
KindWithContent::UpgradeDatabase { from: version } => { KindWithContent::UpgradeDatabase { from: version } => {
KindDump::UpgradeDatabase { from: version } KindDump::UpgradeDatabase { from: version }
} }

View File

@ -44,6 +44,7 @@ time = { version = "0.3.41", features = [
tracing = "0.1.41" tracing = "0.1.41"
ureq = "2.12.1" ureq = "2.12.1"
uuid = { version = "1.17.0", features = ["serde", "v4"] } uuid = { version = "1.17.0", features = ["serde", "v4"] }
backoff = "0.4.0"
[dev-dependencies] [dev-dependencies]
big_s = "1.0.2" big_s = "1.0.2"

View File

@ -4,6 +4,7 @@ use std::io;
use dump::{KindDump, TaskDump, UpdateFile}; use dump::{KindDump, TaskDump, UpdateFile};
use meilisearch_types::batches::{Batch, BatchId}; use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::heed::RwTxn; use meilisearch_types::heed::RwTxn;
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::milli; use meilisearch_types::milli;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
@ -211,6 +212,23 @@ impl<'a> Dump<'a> {
KindWithContent::DumpCreation { keys, instance_uid } KindWithContent::DumpCreation { keys, instance_uid }
} }
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation, KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
KindDump::Export { url, api_key, payload_size, indexes } => {
KindWithContent::Export {
url,
api_key,
payload_size,
indexes: indexes
.into_iter()
.map(|(pattern, settings)| {
Ok((
IndexUidPattern::try_from(pattern)
.map_err(|_| Error::CorruptedDump)?,
settings,
))
})
.collect::<Result<_, Error>>()?,
}
}
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from }, KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
}, },
}; };

View File

@ -151,6 +151,10 @@ pub enum Error {
CorruptedTaskQueue, CorruptedTaskQueue,
#[error(transparent)] #[error(transparent)]
DatabaseUpgrade(Box<Self>), DatabaseUpgrade(Box<Self>),
#[error(transparent)]
Export(Box<Self>),
#[error("Failed to export documents to remote server {code} ({type}): {message} <{link}>")]
FromRemoteWhenExporting { message: String, code: String, r#type: String, link: String },
#[error("Failed to rollback for index `{index}`: {rollback_outcome} ")] #[error("Failed to rollback for index `{index}`: {rollback_outcome} ")]
RollbackFailed { index: String, rollback_outcome: RollbackOutcome }, RollbackFailed { index: String, rollback_outcome: RollbackOutcome },
#[error(transparent)] #[error(transparent)]
@ -212,6 +216,7 @@ impl Error {
| Error::BatchNotFound(_) | Error::BatchNotFound(_)
| Error::TaskDeletionWithEmptyQuery | Error::TaskDeletionWithEmptyQuery
| Error::TaskCancelationWithEmptyQuery | Error::TaskCancelationWithEmptyQuery
| Error::FromRemoteWhenExporting { .. }
| Error::AbortedTask | Error::AbortedTask
| Error::Dump(_) | Error::Dump(_)
| Error::Heed(_) | Error::Heed(_)
@ -221,6 +226,7 @@ impl Error {
| Error::IoError(_) | Error::IoError(_)
| Error::Persist(_) | Error::Persist(_)
| Error::FeatureNotEnabled(_) | Error::FeatureNotEnabled(_)
| Error::Export(_)
| Error::Anyhow(_) => true, | Error::Anyhow(_) => true,
Error::CreateBatch(_) Error::CreateBatch(_)
| Error::CorruptedTaskQueue | Error::CorruptedTaskQueue
@ -282,6 +288,7 @@ impl ErrorCode for Error {
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli { error, .. } => error.error_code(), Error::Milli { error, .. } => error.error_code(),
Error::ProcessBatchPanicked(_) => Code::Internal, Error::ProcessBatchPanicked(_) => Code::Internal,
Error::FromRemoteWhenExporting { .. } => Code::Internal,
Error::Heed(e) => e.error_code(), Error::Heed(e) => e.error_code(),
Error::HeedTransaction(e) => e.error_code(), Error::HeedTransaction(e) => e.error_code(),
Error::FileStore(e) => e.error_code(), Error::FileStore(e) => e.error_code(),
@ -294,6 +301,7 @@ impl ErrorCode for Error {
Error::CorruptedTaskQueue => Code::Internal, Error::CorruptedTaskQueue => Code::Internal,
Error::CorruptedDump => Code::Internal, Error::CorruptedDump => Code::Internal,
Error::DatabaseUpgrade(_) => Code::Internal, Error::DatabaseUpgrade(_) => Code::Internal,
Error::Export(_) => Code::Internal,
Error::RollbackFailed { .. } => Code::Internal, Error::RollbackFailed { .. } => Code::Internal,
Error::UnrecoverableError(_) => Code::Internal, Error::UnrecoverableError(_) => Code::Internal,
Error::IndexSchedulerVersionMismatch { .. } => Code::Internal, Error::IndexSchedulerVersionMismatch { .. } => Code::Internal,

View File

@ -289,6 +289,9 @@ fn snapshot_details(d: &Details) -> String {
Details::IndexSwap { swaps } => { Details::IndexSwap { swaps } => {
format!("{{ swaps: {swaps:?} }}") format!("{{ swaps: {swaps:?} }}")
} }
Details::Export { url, api_key, payload_size, indexes } => {
format!("{{ url: {url:?}, api_key: {api_key:?}, payload_size: {payload_size:?}, indexes: {indexes:?} }}")
}
Details::UpgradeDatabase { from, to } => { Details::UpgradeDatabase { from, to } => {
format!("{{ from: {from:?}, to: {to:?} }}") format!("{{ from: {from:?}, to: {to:?} }}")
} }

View File

@ -175,8 +175,17 @@ make_enum_progress! {
} }
} }
make_enum_progress! {
pub enum Export {
EnsuringCorrectnessOfTheTarget,
ExportingTheSettings,
ExportingTheDocuments,
}
}
make_atomic_progress!(Task alias AtomicTaskStep => "task" ); make_atomic_progress!(Task alias AtomicTaskStep => "task" );
make_atomic_progress!(Document alias AtomicDocumentStep => "document" ); make_atomic_progress!(Document alias AtomicDocumentStep => "document" );
make_atomic_progress!(Index alias AtomicIndexStep => "index" );
make_atomic_progress!(Batch alias AtomicBatchStep => "batch" ); make_atomic_progress!(Batch alias AtomicBatchStep => "batch" );
make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" ); make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" );

View File

@ -71,6 +71,7 @@ impl From<KindWithContent> for AutobatchKind {
KindWithContent::TaskCancelation { .. } KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. } | KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. } | KindWithContent::DumpCreation { .. }
| KindWithContent::Export { .. }
| KindWithContent::UpgradeDatabase { .. } | KindWithContent::UpgradeDatabase { .. }
| KindWithContent::SnapshotCreation => { | KindWithContent::SnapshotCreation => {
panic!("The autobatcher should never be called with tasks that don't apply to an index.") panic!("The autobatcher should never be called with tasks that don't apply to an index.")

View File

@ -48,6 +48,9 @@ pub(crate) enum Batch {
IndexSwap { IndexSwap {
task: Task, task: Task,
}, },
Export {
task: Task,
},
UpgradeDatabase { UpgradeDatabase {
tasks: Vec<Task>, tasks: Vec<Task>,
}, },
@ -104,6 +107,7 @@ impl Batch {
Batch::TaskCancelation { task, .. } Batch::TaskCancelation { task, .. }
| Batch::Dump(task) | Batch::Dump(task)
| Batch::IndexCreation { task, .. } | Batch::IndexCreation { task, .. }
| Batch::Export { task }
| Batch::IndexUpdate { task, .. } => { | Batch::IndexUpdate { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
} }
@ -143,6 +147,7 @@ impl Batch {
| TaskDeletions(_) | TaskDeletions(_)
| SnapshotCreation(_) | SnapshotCreation(_)
| Dump(_) | Dump(_)
| Export { .. }
| UpgradeDatabase { .. } | UpgradeDatabase { .. }
| IndexSwap { .. } => None, | IndexSwap { .. } => None,
IndexOperation { op, .. } => Some(op.index_uid()), IndexOperation { op, .. } => Some(op.index_uid()),
@ -168,6 +173,7 @@ impl fmt::Display for Batch {
Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?, Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?,
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?, Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?, Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
Batch::Export { .. } => f.write_str("Export")?,
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?, Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
}; };
match index_uid { match index_uid {
@ -427,9 +433,10 @@ impl IndexScheduler {
/// 0. We get the *last* task to cancel. /// 0. We get the *last* task to cancel.
/// 1. We get the tasks to upgrade. /// 1. We get the tasks to upgrade.
/// 2. We get the *next* task to delete. /// 2. We get the *next* task to delete.
/// 3. We get the *next* snapshot to process. /// 3. We get the *next* export to process.
/// 4. We get the *next* dump to process. /// 4. We get the *next* snapshot to process.
/// 5. We get the *next* tasks to process for a specific index. /// 5. We get the *next* dump to process.
/// 6. We get the *next* tasks to process for a specific index.
#[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")] #[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")]
pub(crate) fn create_next_batch( pub(crate) fn create_next_batch(
&self, &self,
@ -501,7 +508,17 @@ impl IndexScheduler {
return Ok(Some((Batch::TaskDeletions(tasks), current_batch))); return Ok(Some((Batch::TaskDeletions(tasks), current_batch)));
} }
// 3. we batch the snapshot. // 3. we batch the export.
let to_export = self.queue.tasks.get_kind(rtxn, Kind::Export)? & enqueued;
if !to_export.is_empty() {
let task_id = to_export.iter().next().expect("There must be at least one export task");
let mut task = self.queue.tasks.get_task(rtxn, task_id)?.unwrap();
current_batch.processing([&mut task]);
current_batch.reason(BatchStopReason::TaskKindCannotBeBatched { kind: Kind::Export });
return Ok(Some((Batch::Export { task }, current_batch)));
}
// 4. we batch the snapshot.
let to_snapshot = self.queue.tasks.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued; let to_snapshot = self.queue.tasks.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued;
if !to_snapshot.is_empty() { if !to_snapshot.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?; let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?;
@ -511,7 +528,7 @@ impl IndexScheduler {
return Ok(Some((Batch::SnapshotCreation(tasks), current_batch))); return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
} }
// 4. we batch the dumps. // 5. we batch the dumps.
let to_dump = self.queue.tasks.get_kind(rtxn, Kind::DumpCreation)? & enqueued; let to_dump = self.queue.tasks.get_kind(rtxn, Kind::DumpCreation)? & enqueued;
if let Some(to_dump) = to_dump.min() { if let Some(to_dump) = to_dump.min() {
let mut task = let mut task =
@ -524,7 +541,7 @@ impl IndexScheduler {
return Ok(Some((Batch::Dump(task), current_batch))); return Ok(Some((Batch::Dump(task), current_batch)));
} }
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. // 6. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
let mut task = let mut task =
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;

View File

@ -4,6 +4,7 @@ mod autobatcher_test;
mod create_batch; mod create_batch;
mod process_batch; mod process_batch;
mod process_dump_creation; mod process_dump_creation;
mod process_export;
mod process_index_operation; mod process_index_operation;
mod process_snapshot_creation; mod process_snapshot_creation;
mod process_upgrade; mod process_upgrade;

View File

@ -368,6 +368,46 @@ impl IndexScheduler {
task.status = Status::Succeeded; task.status = Status::Succeeded;
Ok((vec![task], ProcessBatchInfo::default())) Ok((vec![task], ProcessBatchInfo::default()))
} }
Batch::Export { mut task } => {
let KindWithContent::Export { url, api_key, payload_size, indexes } = &task.kind
else {
unreachable!()
};
let ret = catch_unwind(AssertUnwindSafe(|| {
self.process_export(
url,
api_key.as_deref(),
payload_size.as_ref(),
indexes,
progress,
)
}));
let stats = match ret {
Ok(Ok(stats)) => stats,
Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask),
Ok(Err(e)) => return Err(Error::Export(Box::new(e))),
Err(e) => {
let msg = match e.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match e.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
return Err(Error::Export(Box::new(Error::ProcessBatchPanicked(
msg.to_string(),
))));
}
};
task.status = Status::Succeeded;
if let Some(Details::Export { indexes, .. }) = task.details.as_mut() {
*indexes = stats;
}
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::UpgradeDatabase { mut tasks } => { Batch::UpgradeDatabase { mut tasks } => {
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
unreachable!(); unreachable!();
@ -715,9 +755,11 @@ impl IndexScheduler {
from.1, from.1,
from.2 from.2
); );
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { let ret = catch_unwind(std::panic::AssertUnwindSafe(|| {
self.process_rollback(from, progress) self.process_rollback(from, progress)
})) { }));
match ret {
Ok(Ok(())) => {} Ok(Ok(())) => {}
Ok(Err(err)) => return Err(Error::DatabaseUpgrade(Box::new(err))), Ok(Err(err)) => return Err(Error::DatabaseUpgrade(Box::new(err))),
Err(e) => { Err(e) => {

View File

@ -0,0 +1,373 @@
use std::collections::BTreeMap;
use std::io::{self, Write as _};
use std::sync::atomic;
use std::time::Duration;
use backoff::ExponentialBackoff;
use byte_unit::Byte;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::update::{request_threads, Setting};
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use meilisearch_types::milli::{self, obkv_to_json, Filter, InternalError};
use meilisearch_types::settings::{self, SecretPolicy};
use meilisearch_types::tasks::{DetailsExportIndexSettings, ExportIndexSettings};
use serde::Deserialize;
use ureq::{json, Response};
use super::MustStopProcessing;
use crate::processing::AtomicDocumentStep;
use crate::{Error, IndexScheduler, Result};
impl IndexScheduler {
pub(super) fn process_export(
&self,
base_url: &str,
api_key: Option<&str>,
payload_size: Option<&Byte>,
indexes: &BTreeMap<IndexUidPattern, ExportIndexSettings>,
progress: Progress,
) -> Result<BTreeMap<IndexUidPattern, DetailsExportIndexSettings>> {
#[cfg(test)]
self.maybe_fail(crate::test_utils::FailureLocation::ProcessExport)?;
let indexes: Vec<_> = self
.index_names()?
.into_iter()
.flat_map(|uid| {
indexes
.iter()
.find(|(pattern, _)| pattern.matches_str(&uid))
.map(|(pattern, settings)| (pattern, uid, settings))
})
.collect();
let mut output = BTreeMap::new();
let agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build();
let must_stop_processing = self.scheduler.must_stop_processing.clone();
for (i, (_pattern, uid, export_settings)) in indexes.iter().enumerate() {
if must_stop_processing.get() {
return Err(Error::AbortedTask);
}
progress.update_progress(VariableNameStep::<ExportIndex>::new(
format!("Exporting index `{uid}`"),
i as u32,
indexes.len() as u32,
));
let ExportIndexSettings { filter, override_settings } = export_settings;
let index = self.index(uid)?;
let index_rtxn = index.read_txn()?;
// First, check if the index already exists
let url = format!("{base_url}/indexes/{uid}");
let response = retry(&must_stop_processing, || {
let mut request = agent.get(&url);
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
}
request.send_bytes(Default::default()).map_err(into_backoff_error)
});
let index_exists = match response {
Ok(response) => response.status() == 200,
Err(Error::FromRemoteWhenExporting { code, .. }) if code == "index_not_found" => {
false
}
Err(e) => return Err(e),
};
let primary_key = index
.primary_key(&index_rtxn)
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
// Create the index
if !index_exists {
let url = format!("{base_url}/indexes");
retry(&must_stop_processing, || {
let mut request = agent.post(&url);
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
}
let index_param = json!({ "uid": uid, "primaryKey": primary_key });
request.send_json(&index_param).map_err(into_backoff_error)
})?;
}
// Patch the index primary key
if index_exists && *override_settings {
let url = format!("{base_url}/indexes/{uid}");
retry(&must_stop_processing, || {
let mut request = agent.patch(&url);
if let Some(api_key) = api_key {
request = request.set("Authorization", &format!("Bearer {api_key}"));
}
let index_param = json!({ "primaryKey": primary_key });
request.send_json(&index_param).map_err(into_backoff_error)
})?;
}
// Send the index settings
if !index_exists || *override_settings {
let mut settings =
settings::settings(&index, &index_rtxn, SecretPolicy::RevealSecrets)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
// Remove the experimental chat setting if not enabled
if self.features().check_chat_completions("exporting chat settings").is_err() {
settings.chat = Setting::NotSet;
}
// Retry logic for sending settings
let url = format!("{base_url}/indexes/{uid}/settings");
let bearer = api_key.map(|api_key| format!("Bearer {api_key}"));
retry(&must_stop_processing, || {
let mut request = agent.patch(&url);
if let Some(bearer) = bearer.as_ref() {
request = request.set("Authorization", bearer);
}
request.send_json(settings.clone()).map_err(into_backoff_error)
})?;
}
let filter = filter
.as_ref()
.map(Filter::from_json)
.transpose()
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?
.flatten();
let filter_universe = filter
.map(|f| f.evaluate(&index_rtxn, &index))
.transpose()
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
let whole_universe = index
.documents_ids(&index_rtxn)
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
let universe = filter_universe.unwrap_or(whole_universe);
let fields_ids_map = index.fields_ids_map(&index_rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let embedding_configs = index
.embedding_configs(&index_rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
// We don't need to keep this one alive as we will
// spawn many threads to process the documents
drop(index_rtxn);
let total_documents = universe.len() as u32;
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
progress.update_progress(progress_step);
output.insert(
IndexUidPattern::new_unchecked(uid.clone()),
DetailsExportIndexSettings {
settings: (*export_settings).clone(),
matched_documents: Some(total_documents as u64),
},
);
let limit = payload_size.map(|ps| ps.as_u64() as usize).unwrap_or(50 * 1024 * 1024); // defaults to 50 MiB
let documents_url = format!("{base_url}/indexes/{uid}/documents");
request_threads()
.broadcast(|ctx| {
let index_rtxn = index
.read_txn()
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
let mut buffer = Vec::new();
let mut tmp_buffer = Vec::new();
let mut compressed_buffer = Vec::new();
for (i, docid) in universe.iter().enumerate() {
if i % ctx.num_threads() != ctx.index() {
continue;
}
let document = index
.document(&index_rtxn, docid)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
let mut document = obkv_to_json(&all_fields, &fields_ids_map, document)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
// TODO definitely factorize this code
'inject_vectors: {
let embeddings = index
.embeddings(&index_rtxn, docid)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
if embeddings.is_empty() {
break 'inject_vectors;
}
let vectors = document
.entry(RESERVED_VECTORS_FIELD_NAME)
.or_insert(serde_json::Value::Object(Default::default()));
let serde_json::Value::Object(vectors) = vectors else {
return Err(Error::from_milli(
milli::Error::UserError(
milli::UserError::InvalidVectorsMapType {
document_id: {
if let Ok(Some(Ok(index))) = index
.external_id_of(
&index_rtxn,
std::iter::once(docid),
)
.map(|it| it.into_iter().next())
{
index
} else {
format!("internal docid={docid}")
}
},
value: vectors.clone(),
},
),
Some(uid.to_string()),
));
};
for (embedder_name, embeddings) in embeddings {
let user_provided = embedding_configs
.iter()
.find(|conf| conf.name == embedder_name)
.is_some_and(|conf| conf.user_provided.contains(docid));
let embeddings = ExplicitVectors {
embeddings: Some(
VectorOrArrayOfVectors::from_array_of_vectors(embeddings),
),
regenerate: !user_provided,
};
vectors.insert(
embedder_name,
serde_json::to_value(embeddings).unwrap(),
);
}
}
tmp_buffer.clear();
serde_json::to_writer(&mut tmp_buffer, &document)
.map_err(milli::InternalError::from)
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
// Make sure we put at least one document in the buffer even
// though we might go above the buffer limit before sending
if !buffer.is_empty() && buffer.len() + tmp_buffer.len() > limit {
// We compress the documents before sending them
let mut encoder =
GzEncoder::new(&mut compressed_buffer, Compression::default());
encoder
.write_all(&buffer)
.map_err(|e| Error::from_milli(e.into(), Some(uid.clone())))?;
encoder
.finish()
.map_err(|e| Error::from_milli(e.into(), Some(uid.clone())))?;
retry(&must_stop_processing, || {
let mut request = agent.post(&documents_url);
request = request.set("Content-Type", "application/x-ndjson");
request = request.set("Content-Encoding", "gzip");
if let Some(api_key) = api_key {
request = request
.set("Authorization", &(format!("Bearer {api_key}")));
}
request.send_bytes(&compressed_buffer).map_err(into_backoff_error)
})?;
buffer.clear();
compressed_buffer.clear();
}
buffer.extend_from_slice(&tmp_buffer);
if i % 100 == 0 {
step.fetch_add(100, atomic::Ordering::Relaxed);
}
}
retry(&must_stop_processing, || {
let mut request = agent.post(&documents_url);
request = request.set("Content-Type", "application/x-ndjson");
if let Some(api_key) = api_key {
request = request.set("Authorization", &(format!("Bearer {api_key}")));
}
request.send_bytes(&buffer).map_err(into_backoff_error)
})?;
Ok(())
})
.map_err(|e| {
Error::from_milli(
milli::Error::InternalError(InternalError::PanicInThreadPool(e)),
Some(uid.to_string()),
)
})?;
step.store(total_documents, atomic::Ordering::Relaxed);
}
Ok(output)
}
}
fn retry<F>(must_stop_processing: &MustStopProcessing, send_request: F) -> Result<ureq::Response>
where
F: Fn() -> Result<ureq::Response, backoff::Error<ureq::Error>>,
{
match backoff::retry(ExponentialBackoff::default(), || {
if must_stop_processing.get() {
return Err(backoff::Error::Permanent(ureq::Error::Status(
u16::MAX,
// 444: Connection Closed Without Response
Response::new(444, "Abort", "Aborted task").unwrap(),
)));
}
send_request()
}) {
Ok(response) => Ok(response),
Err(backoff::Error::Permanent(e)) => Err(ureq_error_into_error(e)),
Err(backoff::Error::Transient { err, retry_after: _ }) => Err(ureq_error_into_error(err)),
}
}
fn into_backoff_error(err: ureq::Error) -> backoff::Error<ureq::Error> {
match err {
// Those code status must trigger an automatic retry
// <https://www.restapitutorial.com/advanced/responses/retries>
ureq::Error::Status(408 | 429 | 500 | 502 | 503 | 504, _) => {
backoff::Error::Transient { err, retry_after: None }
}
ureq::Error::Status(_, _) => backoff::Error::Permanent(err),
ureq::Error::Transport(_) => backoff::Error::Transient { err, retry_after: None },
}
}
/// Converts a `ureq::Error` into an `Error`.
fn ureq_error_into_error(error: ureq::Error) -> Error {
#[derive(Deserialize)]
struct MeiliError {
message: String,
code: String,
r#type: String,
link: String,
}
match error {
// This is a workaround to handle task abortion - the error propagation path
// makes it difficult to cleanly surface the abortion at this level.
ureq::Error::Status(u16::MAX, _) => Error::AbortedTask,
ureq::Error::Status(_, response) => match response.into_json() {
Ok(MeiliError { message, code, r#type, link }) => {
Error::FromRemoteWhenExporting { message, code, r#type, link }
}
Err(e) => e.into(),
},
ureq::Error::Transport(transport) => io::Error::new(io::ErrorKind::Other, transport).into(),
}
}
enum ExportIndex {}

View File

@ -732,6 +732,7 @@ fn basic_get_stats() {
"documentDeletion": 0, "documentDeletion": 0,
"documentEdition": 0, "documentEdition": 0,
"dumpCreation": 0, "dumpCreation": 0,
"export": 0,
"indexCreation": 3, "indexCreation": 3,
"indexDeletion": 0, "indexDeletion": 0,
"indexSwap": 0, "indexSwap": 0,
@ -765,6 +766,7 @@ fn basic_get_stats() {
"documentDeletion": 0, "documentDeletion": 0,
"documentEdition": 0, "documentEdition": 0,
"dumpCreation": 0, "dumpCreation": 0,
"export": 0,
"indexCreation": 3, "indexCreation": 3,
"indexDeletion": 0, "indexDeletion": 0,
"indexSwap": 0, "indexSwap": 0,
@ -805,6 +807,7 @@ fn basic_get_stats() {
"documentDeletion": 0, "documentDeletion": 0,
"documentEdition": 0, "documentEdition": 0,
"dumpCreation": 0, "dumpCreation": 0,
"export": 0,
"indexCreation": 3, "indexCreation": 3,
"indexDeletion": 0, "indexDeletion": 0,
"indexSwap": 0, "indexSwap": 0,
@ -846,6 +849,7 @@ fn basic_get_stats() {
"documentDeletion": 0, "documentDeletion": 0,
"documentEdition": 0, "documentEdition": 0,
"dumpCreation": 0, "dumpCreation": 0,
"export": 0,
"indexCreation": 3, "indexCreation": 3,
"indexDeletion": 0, "indexDeletion": 0,
"indexSwap": 0, "indexSwap": 0,

View File

@ -37,6 +37,7 @@ pub(crate) enum FailureLocation {
InsideCreateBatch, InsideCreateBatch,
InsideProcessBatch, InsideProcessBatch,
PanicInsideProcessBatch, PanicInsideProcessBatch,
ProcessExport,
ProcessUpgrade, ProcessUpgrade,
AcquiringWtxn, AcquiringWtxn,
UpdatingTaskAfterProcessBatchSuccess { task_uid: u32 }, UpdatingTaskAfterProcessBatchSuccess { task_uid: u32 },

View File

@ -278,6 +278,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
K::TaskCancelation { .. } K::TaskCancelation { .. }
| K::TaskDeletion { .. } | K::TaskDeletion { .. }
| K::DumpCreation { .. } | K::DumpCreation { .. }
| K::Export { .. }
| K::UpgradeDatabase { .. } | K::UpgradeDatabase { .. }
| K::SnapshotCreation => (), | K::SnapshotCreation => (),
}; };
@ -605,6 +606,9 @@ impl crate::IndexScheduler {
Details::Dump { dump_uid: _ } => { Details::Dump { dump_uid: _ } => {
assert_eq!(kind.as_kind(), Kind::DumpCreation); assert_eq!(kind.as_kind(), Kind::DumpCreation);
} }
Details::Export { url: _, api_key: _, payload_size: _, indexes: _ } => {
assert_eq!(kind.as_kind(), Kind::Export);
}
Details::UpgradeDatabase { from: _, to: _ } => { Details::UpgradeDatabase { from: _, to: _ } => {
assert_eq!(kind.as_kind(), Kind::UpgradeDatabase); assert_eq!(kind.as_kind(), Kind::UpgradeDatabase);
} }

View File

@ -15,6 +15,7 @@ actix-web = { version = "4.11.0", default-features = false }
anyhow = "1.0.98" anyhow = "1.0.98"
bumpalo = "3.18.1" bumpalo = "3.18.1"
bumparaw-collections = "0.1.4" bumparaw-collections = "0.1.4"
byte-unit = { version = "5.1.6", features = ["serde"] }
convert_case = "0.8.0" convert_case = "0.8.0"
csv = "1.3.1" csv = "1.3.1"
deserr = { version = "0.6.3", features = ["actix-web"] } deserr = { version = "0.6.3", features = ["actix-web"] }

View File

@ -389,6 +389,13 @@ InvalidDocumentEditionContext , InvalidRequest , BAD_REQU
InvalidDocumentEditionFunctionFilter , InvalidRequest , BAD_REQUEST ; InvalidDocumentEditionFunctionFilter , InvalidRequest , BAD_REQUEST ;
EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST ; EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST ;
InvalidSettingsIndexChat , InvalidRequest , BAD_REQUEST ; InvalidSettingsIndexChat , InvalidRequest , BAD_REQUEST ;
// Export
InvalidExportUrl , InvalidRequest , BAD_REQUEST ;
InvalidExportApiKey , InvalidRequest , BAD_REQUEST ;
InvalidExportPayloadSize , InvalidRequest , BAD_REQUEST ;
InvalidExportIndexesPatterns , InvalidRequest , BAD_REQUEST ;
InvalidExportIndexFilter , InvalidRequest , BAD_REQUEST ;
InvalidExportIndexOverrideSettings , InvalidRequest , BAD_REQUEST ;
// Experimental features - Chat Completions // Experimental features - Chat Completions
UnimplementedExternalFunctionCalling , InvalidRequest , NOT_IMPLEMENTED ; UnimplementedExternalFunctionCalling , InvalidRequest , NOT_IMPLEMENTED ;
UnimplementedNonStreamingChatCompletions , InvalidRequest , NOT_IMPLEMENTED ; UnimplementedNonStreamingChatCompletions , InvalidRequest , NOT_IMPLEMENTED ;

View File

@ -12,7 +12,7 @@ use crate::index_uid::{IndexUid, IndexUidFormatError};
/// An index uid pattern is composed of only ascii alphanumeric characters, - and _, between 1 and 400 /// An index uid pattern is composed of only ascii alphanumeric characters, - and _, between 1 and 400
/// bytes long and optionally ending with a *. /// bytes long and optionally ending with a *.
#[derive(Serialize, Deserialize, Deserr, Debug, Clone, PartialEq, Eq, Hash)] #[derive(Serialize, Deserialize, Deserr, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[deserr(try_from(&String) = FromStr::from_str -> IndexUidPatternFormatError)] #[deserr(try_from(&String) = FromStr::from_str -> IndexUidPatternFormatError)]
pub struct IndexUidPattern(String); pub struct IndexUidPattern(String);

View File

@ -317,6 +317,9 @@ pub enum Action {
#[serde(rename = "experimental.update")] #[serde(rename = "experimental.update")]
#[deserr(rename = "experimental.update")] #[deserr(rename = "experimental.update")]
ExperimentalFeaturesUpdate, ExperimentalFeaturesUpdate,
#[serde(rename = "export")]
#[deserr(rename = "export")]
Export,
#[serde(rename = "network.get")] #[serde(rename = "network.get")]
#[deserr(rename = "network.get")] #[deserr(rename = "network.get")]
NetworkGet, NetworkGet,
@ -438,6 +441,8 @@ pub mod actions {
pub const EXPERIMENTAL_FEATURES_GET: u8 = ExperimentalFeaturesGet.repr(); pub const EXPERIMENTAL_FEATURES_GET: u8 = ExperimentalFeaturesGet.repr();
pub const EXPERIMENTAL_FEATURES_UPDATE: u8 = ExperimentalFeaturesUpdate.repr(); pub const EXPERIMENTAL_FEATURES_UPDATE: u8 = ExperimentalFeaturesUpdate.repr();
pub const EXPORT: u8 = Export.repr();
pub const NETWORK_GET: u8 = NetworkGet.repr(); pub const NETWORK_GET: u8 = NetworkGet.repr();
pub const NETWORK_UPDATE: u8 = NetworkUpdate.repr(); pub const NETWORK_UPDATE: u8 = NetworkUpdate.repr();

View File

@ -18,7 +18,7 @@ pub mod versioning;
pub use milli::{heed, Index}; pub use milli::{heed, Index};
use uuid::Uuid; use uuid::Uuid;
pub use versioning::VERSION_FILE_NAME; pub use versioning::VERSION_FILE_NAME;
pub use {milli, serde_cs}; pub use {byte_unit, milli, serde_cs};
pub type Document = serde_json::Map<String, serde_json::Value>; pub type Document = serde_json::Map<String, serde_json::Value>;
pub type InstanceUid = Uuid; pub type InstanceUid = Uuid;

View File

@ -969,6 +969,7 @@ pub fn settings(
if let SecretPolicy::HideSecrets = secret_policy { if let SecretPolicy::HideSecrets = secret_policy {
settings.hide_secrets() settings.hide_secrets()
} }
Ok(settings) Ok(settings)
} }

View File

@ -1,3 +1,6 @@
use std::collections::BTreeMap;
use byte_unit::UnitType;
use milli::Object; use milli::Object;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::{Duration, OffsetDateTime}; use time::{Duration, OffsetDateTime};
@ -6,7 +9,9 @@ use utoipa::ToSchema;
use crate::batches::BatchId; use crate::batches::BatchId;
use crate::error::ResponseError; use crate::error::ResponseError;
use crate::settings::{Settings, Unchecked}; use crate::settings::{Settings, Unchecked};
use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId}; use crate::tasks::{
serialize_duration, Details, DetailsExportIndexSettings, IndexSwap, Kind, Status, Task, TaskId,
};
#[derive(Debug, Clone, PartialEq, Serialize, ToSchema)] #[derive(Debug, Clone, PartialEq, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@ -118,6 +123,15 @@ pub struct DetailsView {
pub upgrade_from: Option<String>, pub upgrade_from: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub upgrade_to: Option<String>, pub upgrade_to: Option<String>,
// exporting
#[serde(skip_serializing_if = "Option::is_none")]
pub url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub api_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload_size: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub indexes: Option<BTreeMap<String, DetailsExportIndexSettings>>,
} }
impl DetailsView { impl DetailsView {
@ -238,6 +252,34 @@ impl DetailsView {
Some(left) Some(left)
} }
}, },
url: match (self.url.clone(), other.url.clone()) {
(None, None) => None,
(None, Some(url)) | (Some(url), None) => Some(url),
// We should never be able to batch multiple exports at the same time.
// So we return the first one we encounter but that shouldn't be an issue anyway.
(Some(left), Some(_right)) => Some(left),
},
api_key: match (self.api_key.clone(), other.api_key.clone()) {
(None, None) => None,
(None, Some(key)) | (Some(key), None) => Some(key),
// We should never be able to batch multiple exports at the same time.
// So we return the first one we encounter but that shouldn't be an issue anyway.
(Some(left), Some(_right)) => Some(left),
},
payload_size: match (self.payload_size.clone(), other.payload_size.clone()) {
(None, None) => None,
(None, Some(size)) | (Some(size), None) => Some(size),
// We should never be able to batch multiple exports at the same time.
// So we return the first one we encounter but that shouldn't be an issue anyway.
(Some(left), Some(_right)) => Some(left),
},
indexes: match (self.indexes.clone(), other.indexes.clone()) {
(None, None) => None,
(None, Some(indexes)) | (Some(indexes), None) => Some(indexes),
// We should never be able to batch multiple exports at the same time.
// So we return the first one we encounter but that shouldn't be an issue anyway.
(Some(left), Some(_right)) => Some(left),
},
// We want the earliest version // We want the earliest version
upgrade_from: match (self.upgrade_from.clone(), other.upgrade_from.clone()) { upgrade_from: match (self.upgrade_from.clone(), other.upgrade_from.clone()) {
(None, None) => None, (None, None) => None,
@ -327,6 +369,22 @@ impl From<Details> for DetailsView {
Details::IndexSwap { swaps } => { Details::IndexSwap { swaps } => {
DetailsView { swaps: Some(swaps), ..Default::default() } DetailsView { swaps: Some(swaps), ..Default::default() }
} }
Details::Export { url, api_key, payload_size, indexes } => DetailsView {
url: Some(url),
api_key: api_key.map(|mut api_key| {
hide_secret(&mut api_key);
api_key
}),
payload_size: payload_size
.map(|ps| ps.get_appropriate_unit(UnitType::Both).to_string()),
indexes: Some(
indexes
.into_iter()
.map(|(pattern, settings)| (pattern.to_string(), settings))
.collect(),
),
..Default::default()
},
Details::UpgradeDatabase { from, to } => DetailsView { Details::UpgradeDatabase { from, to } => DetailsView {
upgrade_from: Some(format!("v{}.{}.{}", from.0, from.1, from.2)), upgrade_from: Some(format!("v{}.{}.{}", from.0, from.1, from.2)),
upgrade_to: Some(format!("v{}.{}.{}", to.0, to.1, to.2)), upgrade_to: Some(format!("v{}.{}.{}", to.0, to.1, to.2)),
@ -335,3 +393,21 @@ impl From<Details> for DetailsView {
} }
} }
} }
// We definitely need to factorize the code to hide the secret key
fn hide_secret(secret: &mut String) {
match secret.len() {
x if x < 10 => {
secret.replace_range(.., "XXX...");
}
x if x < 20 => {
secret.replace_range(2.., "XXXX...");
}
x if x < 30 => {
secret.replace_range(3.., "XXXXX...");
}
_x => {
secret.replace_range(5.., "XXXXXX...");
}
}
}

View File

@ -1,19 +1,22 @@
use core::fmt; use core::fmt;
use std::collections::HashSet; use std::collections::{BTreeMap, HashSet};
use std::fmt::{Display, Write}; use std::fmt::{Display, Write};
use std::str::FromStr; use std::str::FromStr;
use byte_unit::Byte;
use enum_iterator::Sequence; use enum_iterator::Sequence;
use milli::update::IndexDocumentsMethod; use milli::update::IndexDocumentsMethod;
use milli::Object; use milli::Object;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use serde_json::Value;
use time::{Duration, OffsetDateTime}; use time::{Duration, OffsetDateTime};
use utoipa::ToSchema; use utoipa::{schema, ToSchema};
use uuid::Uuid; use uuid::Uuid;
use crate::batches::BatchId; use crate::batches::BatchId;
use crate::error::ResponseError; use crate::error::ResponseError;
use crate::index_uid_pattern::IndexUidPattern;
use crate::keys::Key; use crate::keys::Key;
use crate::settings::{Settings, Unchecked}; use crate::settings::{Settings, Unchecked};
use crate::{versioning, InstanceUid}; use crate::{versioning, InstanceUid};
@ -50,6 +53,7 @@ impl Task {
| SnapshotCreation | SnapshotCreation
| TaskCancelation { .. } | TaskCancelation { .. }
| TaskDeletion { .. } | TaskDeletion { .. }
| Export { .. }
| UpgradeDatabase { .. } | UpgradeDatabase { .. }
| IndexSwap { .. } => None, | IndexSwap { .. } => None,
DocumentAdditionOrUpdate { index_uid, .. } DocumentAdditionOrUpdate { index_uid, .. }
@ -86,6 +90,7 @@ impl Task {
| KindWithContent::TaskDeletion { .. } | KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. } | KindWithContent::DumpCreation { .. }
| KindWithContent::SnapshotCreation | KindWithContent::SnapshotCreation
| KindWithContent::Export { .. }
| KindWithContent::UpgradeDatabase { .. } => None, | KindWithContent::UpgradeDatabase { .. } => None,
} }
} }
@ -108,11 +113,11 @@ pub enum KindWithContent {
}, },
DocumentDeletionByFilter { DocumentDeletionByFilter {
index_uid: String, index_uid: String,
filter_expr: serde_json::Value, filter_expr: Value,
}, },
DocumentEdition { DocumentEdition {
index_uid: String, index_uid: String,
filter_expr: Option<serde_json::Value>, filter_expr: Option<Value>,
context: Option<milli::Object>, context: Option<milli::Object>,
function: String, function: String,
}, },
@ -152,6 +157,12 @@ pub enum KindWithContent {
instance_uid: Option<InstanceUid>, instance_uid: Option<InstanceUid>,
}, },
SnapshotCreation, SnapshotCreation,
Export {
url: String,
api_key: Option<String>,
payload_size: Option<Byte>,
indexes: BTreeMap<IndexUidPattern, ExportIndexSettings>,
},
UpgradeDatabase { UpgradeDatabase {
from: (u32, u32, u32), from: (u32, u32, u32),
}, },
@ -163,6 +174,13 @@ pub struct IndexSwap {
pub indexes: (String, String), pub indexes: (String, String),
} }
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ExportIndexSettings {
pub filter: Option<Value>,
pub override_settings: bool,
}
impl KindWithContent { impl KindWithContent {
pub fn as_kind(&self) -> Kind { pub fn as_kind(&self) -> Kind {
match self { match self {
@ -180,6 +198,7 @@ impl KindWithContent {
KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion, KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion,
KindWithContent::DumpCreation { .. } => Kind::DumpCreation, KindWithContent::DumpCreation { .. } => Kind::DumpCreation,
KindWithContent::SnapshotCreation => Kind::SnapshotCreation, KindWithContent::SnapshotCreation => Kind::SnapshotCreation,
KindWithContent::Export { .. } => Kind::Export,
KindWithContent::UpgradeDatabase { .. } => Kind::UpgradeDatabase, KindWithContent::UpgradeDatabase { .. } => Kind::UpgradeDatabase,
} }
} }
@ -192,6 +211,7 @@ impl KindWithContent {
| SnapshotCreation | SnapshotCreation
| TaskCancelation { .. } | TaskCancelation { .. }
| TaskDeletion { .. } | TaskDeletion { .. }
| Export { .. }
| UpgradeDatabase { .. } => vec![], | UpgradeDatabase { .. } => vec![],
DocumentAdditionOrUpdate { index_uid, .. } DocumentAdditionOrUpdate { index_uid, .. }
| DocumentEdition { index_uid, .. } | DocumentEdition { index_uid, .. }
@ -269,6 +289,14 @@ impl KindWithContent {
}), }),
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
KindWithContent::SnapshotCreation => None, KindWithContent::SnapshotCreation => None,
KindWithContent::Export { url, api_key, payload_size, indexes } => {
Some(Details::Export {
url: url.clone(),
api_key: api_key.clone(),
payload_size: *payload_size,
indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(),
})
}
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase { KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
from: (from.0, from.1, from.2), from: (from.0, from.1, from.2),
to: ( to: (
@ -335,6 +363,14 @@ impl KindWithContent {
}), }),
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
KindWithContent::SnapshotCreation => None, KindWithContent::SnapshotCreation => None,
KindWithContent::Export { url, api_key, payload_size, indexes } => {
Some(Details::Export {
url: url.clone(),
api_key: api_key.clone(),
payload_size: *payload_size,
indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(),
})
}
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase { KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
from: *from, from: *from,
to: ( to: (
@ -383,6 +419,14 @@ impl From<&KindWithContent> for Option<Details> {
}), }),
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
KindWithContent::SnapshotCreation => None, KindWithContent::SnapshotCreation => None,
KindWithContent::Export { url, api_key, payload_size, indexes } => {
Some(Details::Export {
url: url.clone(),
api_key: api_key.clone(),
payload_size: *payload_size,
indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(),
})
}
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase { KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
from: *from, from: *from,
to: ( to: (
@ -499,6 +543,7 @@ pub enum Kind {
TaskDeletion, TaskDeletion,
DumpCreation, DumpCreation,
SnapshotCreation, SnapshotCreation,
Export,
UpgradeDatabase, UpgradeDatabase,
} }
@ -516,6 +561,7 @@ impl Kind {
| Kind::TaskCancelation | Kind::TaskCancelation
| Kind::TaskDeletion | Kind::TaskDeletion
| Kind::DumpCreation | Kind::DumpCreation
| Kind::Export
| Kind::UpgradeDatabase | Kind::UpgradeDatabase
| Kind::SnapshotCreation => false, | Kind::SnapshotCreation => false,
} }
@ -536,6 +582,7 @@ impl Display for Kind {
Kind::TaskDeletion => write!(f, "taskDeletion"), Kind::TaskDeletion => write!(f, "taskDeletion"),
Kind::DumpCreation => write!(f, "dumpCreation"), Kind::DumpCreation => write!(f, "dumpCreation"),
Kind::SnapshotCreation => write!(f, "snapshotCreation"), Kind::SnapshotCreation => write!(f, "snapshotCreation"),
Kind::Export => write!(f, "export"),
Kind::UpgradeDatabase => write!(f, "upgradeDatabase"), Kind::UpgradeDatabase => write!(f, "upgradeDatabase"),
} }
} }
@ -568,6 +615,8 @@ impl FromStr for Kind {
Ok(Kind::DumpCreation) Ok(Kind::DumpCreation)
} else if kind.eq_ignore_ascii_case("snapshotCreation") { } else if kind.eq_ignore_ascii_case("snapshotCreation") {
Ok(Kind::SnapshotCreation) Ok(Kind::SnapshotCreation)
} else if kind.eq_ignore_ascii_case("export") {
Ok(Kind::Export)
} else if kind.eq_ignore_ascii_case("upgradeDatabase") { } else if kind.eq_ignore_ascii_case("upgradeDatabase") {
Ok(Kind::UpgradeDatabase) Ok(Kind::UpgradeDatabase)
} else { } else {
@ -643,12 +692,33 @@ pub enum Details {
IndexSwap { IndexSwap {
swaps: Vec<IndexSwap>, swaps: Vec<IndexSwap>,
}, },
Export {
url: String,
api_key: Option<String>,
payload_size: Option<Byte>,
indexes: BTreeMap<IndexUidPattern, DetailsExportIndexSettings>,
},
UpgradeDatabase { UpgradeDatabase {
from: (u32, u32, u32), from: (u32, u32, u32),
to: (u32, u32, u32), to: (u32, u32, u32),
}, },
} }
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
#[schema(rename_all = "camelCase")]
pub struct DetailsExportIndexSettings {
#[serde(flatten)]
pub settings: ExportIndexSettings,
#[serde(skip_serializing_if = "Option::is_none")]
pub matched_documents: Option<u64>,
}
impl From<ExportIndexSettings> for DetailsExportIndexSettings {
fn from(settings: ExportIndexSettings) -> Self {
DetailsExportIndexSettings { settings, matched_documents: None }
}
}
impl Details { impl Details {
pub fn to_failed(&self) -> Self { pub fn to_failed(&self) -> Self {
let mut details = self.clone(); let mut details = self.clone();
@ -667,6 +737,7 @@ impl Details {
Self::SettingsUpdate { .. } Self::SettingsUpdate { .. }
| Self::IndexInfo { .. } | Self::IndexInfo { .. }
| Self::Dump { .. } | Self::Dump { .. }
| Self::Export { .. }
| Self::UpgradeDatabase { .. } | Self::UpgradeDatabase { .. }
| Self::IndexSwap { .. } => (), | Self::IndexSwap { .. } => (),
} }

View File

@ -0,0 +1,183 @@
use std::collections::BTreeMap;
use std::convert::Infallible;
use std::str::FromStr as _;
use actix_web::web::{self, Data};
use actix_web::{HttpRequest, HttpResponse};
use byte_unit::Byte;
use deserr::actix_web::AwebJson;
use deserr::Deserr;
use index_scheduler::IndexScheduler;
use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::deserr_codes::*;
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::keys::actions;
use meilisearch_types::tasks::{ExportIndexSettings as DbExportIndexSettings, KindWithContent};
use serde::Serialize;
use serde_json::Value;
use tracing::debug;
use utoipa::{OpenApi, ToSchema};
use crate::analytics::Analytics;
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::GuardedData;
use crate::routes::export_analytics::ExportAnalytics;
use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt;
#[derive(OpenApi)]
#[openapi(
paths(export),
tags((
name = "Export",
description = "The `/export` route allows you to trigger an export process to a remote Meilisearch instance.",
external_docs(url = "https://www.meilisearch.com/docs/reference/api/export"),
)),
)]
pub struct ExportApi;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(export)));
}
#[utoipa::path(
post,
path = "",
tag = "Export",
security(("Bearer" = ["export", "*"])),
responses(
(status = 202, description = "Export successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 1,
"status": "enqueued",
"type": "export",
"enqueuedAt": "2021-08-11T09:25:53.000000Z"
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
"message": "The Authorization header is missing. It must use the bearer authorization method.",
"code": "missing_authorization_header",
"type": "auth",
"link": "https://docs.meilisearch.com/errors#missing_authorization_header"
}
)),
)
)]
async fn export(
index_scheduler: GuardedData<ActionPolicy<{ actions::EXPORT }>, Data<IndexScheduler>>,
export: AwebJson<Export, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let export = export.into_inner();
debug!(returns = ?export, "Trigger export");
let analytics_aggregate = ExportAnalytics::from_export(&export);
let Export { url, api_key, payload_size, indexes } = export;
let indexes = match indexes {
Some(indexes) => indexes
.into_iter()
.map(|(pattern, ExportIndexSettings { filter, override_settings })| {
(pattern, DbExportIndexSettings { filter, override_settings })
})
.collect(),
None => BTreeMap::from([(
IndexUidPattern::new_unchecked("*"),
DbExportIndexSettings::default(),
)]),
};
let task = KindWithContent::Export {
url,
api_key,
payload_size: payload_size.map(|ByteWithDeserr(bytes)| bytes),
indexes,
};
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
analytics.publish(analytics_aggregate, &req);
Ok(HttpResponse::Ok().json(task))
}
#[derive(Debug, Deserr, ToSchema, Serialize)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct Export {
#[schema(value_type = Option<String>, example = json!("https://ms-1234.heaven.meilisearch.com"))]
#[serde(default)]
#[deserr(default, error = DeserrJsonError<InvalidExportUrl>)]
pub url: String,
#[schema(value_type = Option<String>, example = json!("1234abcd"))]
#[serde(default)]
#[deserr(default, error = DeserrJsonError<InvalidExportApiKey>)]
pub api_key: Option<String>,
#[schema(value_type = Option<String>, example = json!("24MiB"))]
#[serde(default)]
#[deserr(default, error = DeserrJsonError<InvalidExportPayloadSize>)]
pub payload_size: Option<ByteWithDeserr>,
#[schema(value_type = Option<BTreeMap<String, ExportIndexSettings>>, example = json!({ "*": { "filter": null } }))]
#[deserr(default)]
#[serde(default)]
pub indexes: Option<BTreeMap<IndexUidPattern, ExportIndexSettings>>,
}
/// A wrapper around the `Byte` type that implements `Deserr`.
#[derive(Debug, Serialize)]
#[serde(transparent)]
pub struct ByteWithDeserr(pub Byte);
impl<E> deserr::Deserr<E> for ByteWithDeserr
where
E: deserr::DeserializeError,
{
fn deserialize_from_value<V: deserr::IntoValue>(
value: deserr::Value<V>,
location: deserr::ValuePointerRef,
) -> Result<Self, E> {
use deserr::{ErrorKind, Value, ValueKind};
match value {
Value::Integer(integer) => Ok(ByteWithDeserr(Byte::from_u64(integer))),
Value::String(string) => Byte::from_str(&string).map(ByteWithDeserr).map_err(|e| {
deserr::take_cf_content(E::error::<Infallible>(
None,
ErrorKind::Unexpected { msg: e.to_string() },
location,
))
}),
actual => Err(deserr::take_cf_content(E::error(
None,
ErrorKind::IncorrectValueKind {
actual,
accepted: &[ValueKind::Integer, ValueKind::String],
},
location,
))),
}
}
}
#[derive(Debug, Deserr, ToSchema, Serialize)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct ExportIndexSettings {
#[schema(value_type = Option<String>, example = json!("genres = action"))]
#[serde(default)]
#[deserr(default, error = DeserrJsonError<InvalidExportIndexFilter>)]
pub filter: Option<Value>,
#[schema(value_type = Option<bool>, example = json!(true))]
#[serde(default)]
#[deserr(default, error = DeserrJsonError<InvalidExportIndexOverrideSettings>)]
pub override_settings: bool,
}

View File

@ -0,0 +1,93 @@
use crate::analytics::Aggregate;
use crate::routes::export::Export;
#[derive(Default)]
pub struct ExportAnalytics {
total_received: usize,
has_api_key: bool,
sum_index_patterns: usize,
sum_patterns_with_filter: usize,
sum_patterns_with_override_settings: usize,
payload_sizes: Vec<u64>,
}
impl ExportAnalytics {
pub fn from_export(export: &Export) -> Self {
let Export { url: _, api_key, payload_size, indexes } = export;
let has_api_key = api_key.is_some();
let index_patterns_count = indexes.as_ref().map_or(0, |indexes| indexes.len());
let patterns_with_filter_count = indexes.as_ref().map_or(0, |indexes| {
indexes.values().filter(|settings| settings.filter.is_some()).count()
});
let patterns_with_override_settings_count = indexes.as_ref().map_or(0, |indexes| {
indexes.values().filter(|settings| settings.override_settings).count()
});
let payload_sizes =
if let Some(crate::routes::export::ByteWithDeserr(byte_size)) = payload_size {
vec![byte_size.as_u64()]
} else {
vec![]
};
Self {
total_received: 1,
has_api_key,
sum_index_patterns: index_patterns_count,
sum_patterns_with_filter: patterns_with_filter_count,
sum_patterns_with_override_settings: patterns_with_override_settings_count,
payload_sizes,
}
}
}
impl Aggregate for ExportAnalytics {
fn event_name(&self) -> &'static str {
"Export Triggered"
}
fn aggregate(mut self: Box<Self>, other: Box<Self>) -> Box<Self> {
self.total_received += other.total_received;
self.has_api_key |= other.has_api_key;
self.sum_index_patterns += other.sum_index_patterns;
self.sum_patterns_with_filter += other.sum_patterns_with_filter;
self.sum_patterns_with_override_settings += other.sum_patterns_with_override_settings;
self.payload_sizes.extend(other.payload_sizes);
self
}
fn into_event(self: Box<Self>) -> serde_json::Value {
let avg_payload_size = if self.payload_sizes.is_empty() {
None
} else {
Some(self.payload_sizes.iter().sum::<u64>() / self.payload_sizes.len() as u64)
};
let avg_index_patterns = if self.total_received == 0 {
None
} else {
Some(self.sum_index_patterns as f64 / self.total_received as f64)
};
let avg_patterns_with_filter = if self.total_received == 0 {
None
} else {
Some(self.sum_patterns_with_filter as f64 / self.total_received as f64)
};
let avg_patterns_with_override_settings = if self.total_received == 0 {
None
} else {
Some(self.sum_patterns_with_override_settings as f64 / self.total_received as f64)
};
serde_json::json!({
"total_received": self.total_received,
"has_api_key": self.has_api_key,
"avg_index_patterns": avg_index_patterns,
"avg_patterns_with_filter": avg_patterns_with_filter,
"avg_patterns_with_override_settings": avg_patterns_with_override_settings,
"avg_payload_size": avg_payload_size,
})
}
}

View File

@ -2,6 +2,7 @@ use std::collections::BTreeMap;
use actix_web::web::Data; use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse}; use actix_web::{web, HttpRequest, HttpResponse};
use export::Export;
use index_scheduler::IndexScheduler; use index_scheduler::IndexScheduler;
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::batch_view::BatchView; use meilisearch_types::batch_view::BatchView;
@ -54,6 +55,8 @@ mod api_key;
pub mod batches; pub mod batches;
pub mod chats; pub mod chats;
mod dump; mod dump;
mod export;
mod export_analytics;
pub mod features; pub mod features;
pub mod indexes; pub mod indexes;
mod logs; mod logs;
@ -84,6 +87,7 @@ mod tasks_test;
(path = "/multi-search", api = multi_search::MultiSearchApi), (path = "/multi-search", api = multi_search::MultiSearchApi),
(path = "/swap-indexes", api = swap_indexes::SwapIndexesApi), (path = "/swap-indexes", api = swap_indexes::SwapIndexesApi),
(path = "/experimental-features", api = features::ExperimentalFeaturesApi), (path = "/experimental-features", api = features::ExperimentalFeaturesApi),
(path = "/export", api = export::ExportApi),
(path = "/network", api = network::NetworkApi), (path = "/network", api = network::NetworkApi),
), ),
paths(get_health, get_version, get_stats), paths(get_health, get_version, get_stats),
@ -95,7 +99,7 @@ mod tasks_test;
url = "/", url = "/",
description = "Local server", description = "Local server",
)), )),
components(schemas(PaginationView<KeyView>, PaginationView<IndexView>, IndexView, DocumentDeletionByFilter, AllBatches, BatchStats, ProgressStepView, ProgressView, BatchView, RuntimeTogglableFeatures, SwapIndexesPayload, DocumentEditionByFunction, MergeFacets, FederationOptions, SearchQueryWithIndex, Federation, FederatedSearch, FederatedSearchResult, SearchResults, SearchResultWithIndex, SimilarQuery, SimilarResult, PaginationView<serde_json::Value>, BrowseQuery, UpdateIndexRequest, IndexUid, IndexCreateRequest, KeyView, Action, CreateApiKey, UpdateStderrLogs, LogMode, GetLogs, IndexStats, Stats, HealthStatus, HealthResponse, VersionResponse, Code, ErrorType, AllTasks, TaskView, Status, DetailsView, ResponseError, Settings<Unchecked>, Settings<Checked>, TypoSettings, MinWordSizeTyposSetting, FacetingSettings, PaginationSettings, SummarizedTaskView, Kind, Network, Remote, FilterableAttributesRule, FilterableAttributesPatterns, AttributePatterns, FilterableAttributesFeatures, FilterFeatures)) components(schemas(PaginationView<KeyView>, PaginationView<IndexView>, IndexView, DocumentDeletionByFilter, AllBatches, BatchStats, ProgressStepView, ProgressView, BatchView, RuntimeTogglableFeatures, SwapIndexesPayload, DocumentEditionByFunction, MergeFacets, FederationOptions, SearchQueryWithIndex, Federation, FederatedSearch, FederatedSearchResult, SearchResults, SearchResultWithIndex, SimilarQuery, SimilarResult, PaginationView<serde_json::Value>, BrowseQuery, UpdateIndexRequest, IndexUid, IndexCreateRequest, KeyView, Action, CreateApiKey, UpdateStderrLogs, LogMode, GetLogs, IndexStats, Stats, HealthStatus, HealthResponse, VersionResponse, Code, ErrorType, AllTasks, TaskView, Status, DetailsView, ResponseError, Settings<Unchecked>, Settings<Checked>, TypoSettings, MinWordSizeTyposSetting, FacetingSettings, PaginationSettings, SummarizedTaskView, Kind, Network, Remote, FilterableAttributesRule, FilterableAttributesPatterns, AttributePatterns, FilterableAttributesFeatures, FilterFeatures, Export))
)] )]
pub struct MeilisearchApi; pub struct MeilisearchApi;
@ -115,6 +119,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::scope("/metrics").configure(metrics::configure)) .service(web::scope("/metrics").configure(metrics::configure))
.service(web::scope("/experimental-features").configure(features::configure)) .service(web::scope("/experimental-features").configure(features::configure))
.service(web::scope("/network").configure(network::configure)) .service(web::scope("/network").configure(network::configure))
.service(web::scope("/export").configure(export::configure))
.service(web::scope("/chats").configure(chats::configure)); .service(web::scope("/chats").configure(chats::configure));
#[cfg(feature = "swagger")] #[cfg(feature = "swagger")]

View File

@ -228,7 +228,7 @@ mod tests {
let err = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap_err(); let err = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap_err();
snapshot!(meili_snap::json_string!(err), @r#" snapshot!(meili_snap::json_string!(err), @r#"
{ {
"message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"

View File

@ -421,7 +421,7 @@ async fn error_add_api_key_invalid_parameters_actions() {
meili_snap::snapshot!(code, @"400 Bad Request"); meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response, { ".createdAt" => "[ignored]", ".updatedAt" => "[ignored]" }), @r###" meili_snap::snapshot!(meili_snap::json_string!(response, { ".createdAt" => "[ignored]", ".updatedAt" => "[ignored]" }), @r###"
{ {
"message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`", "message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`",
"code": "invalid_api_key_actions", "code": "invalid_api_key_actions",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_api_key_actions" "link": "https://docs.meilisearch.com/errors#invalid_api_key_actions"

View File

@ -93,7 +93,7 @@ async fn create_api_key_bad_actions() {
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
"message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`", "message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`",
"code": "invalid_api_key_actions", "code": "invalid_api_key_actions",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_api_key_actions" "link": "https://docs.meilisearch.com/errors#invalid_api_key_actions"

View File

@ -42,7 +42,7 @@ async fn batch_bad_types() {
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#" snapshot!(json_string!(response), @r#"
{ {
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"

View File

@ -97,7 +97,7 @@ async fn task_bad_types() {
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#" snapshot!(json_string!(response), @r#"
{ {
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"
@ -108,7 +108,7 @@ async fn task_bad_types() {
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#" snapshot!(json_string!(response), @r#"
{ {
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"
@ -119,7 +119,7 @@ async fn task_bad_types() {
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#" snapshot!(json_string!(response), @r#"
{ {
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.",
"code": "invalid_task_types", "code": "invalid_task_types",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_types" "link": "https://docs.meilisearch.com/errors#invalid_task_types"

View File

@ -1,7 +1,7 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use rayon::{ThreadPool, ThreadPoolBuilder}; use rayon::{BroadcastContext, ThreadPool, ThreadPoolBuilder};
use thiserror::Error; use thiserror::Error;
/// A rayon ThreadPool wrapper that can catch panics in the pool /// A rayon ThreadPool wrapper that can catch panics in the pool
@ -32,6 +32,22 @@ impl ThreadPoolNoAbort {
} }
} }
pub fn broadcast<OP, R>(&self, op: OP) -> Result<Vec<R>, PanicCatched>
where
OP: Fn(BroadcastContext<'_>) -> R + Sync,
R: Send,
{
self.active_operations.fetch_add(1, Ordering::Relaxed);
let output = self.thread_pool.broadcast(op);
self.active_operations.fetch_sub(1, Ordering::Relaxed);
// While reseting the pool panic catcher we return an error if we catched one.
if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
Err(PanicCatched)
} else {
Ok(output)
}
}
pub fn current_num_threads(&self) -> usize { pub fn current_num_threads(&self) -> usize {
self.thread_pool.current_num_threads() self.thread_pool.current_num_threads()
} }

View File

@ -213,7 +213,7 @@ fn run_extraction_task<FE, FS, M>(
}) })
} }
fn request_threads() -> &'static ThreadPoolNoAbort { pub fn request_threads() -> &'static ThreadPoolNoAbort {
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new(); static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
REQUEST_THREADS.get_or_init(|| { REQUEST_THREADS.get_or_init(|| {

View File

@ -12,6 +12,7 @@ use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use enrich::enrich_documents_batch; use enrich::enrich_documents_batch;
pub use extract::request_threads;
use grenad::{Merger, MergerBuilder}; use grenad::{Merger, MergerBuilder};
use hashbrown::HashMap; use hashbrown::HashMap;
use heed::types::Str; use heed::types::Str;

View File

@ -4,7 +4,7 @@ pub use self::clear_documents::ClearDocuments;
pub use self::concurrent_available_ids::ConcurrentAvailableIds; pub use self::concurrent_available_ids::ConcurrentAvailableIds;
pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::bulk::FacetsUpdateBulk;
pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::facet::incremental::FacetsUpdateIncrementalInner;
pub use self::index_documents::*; pub use self::index_documents::{request_threads, *};
pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig}; pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig};
pub use self::new::ChannelCongestion; pub use self::new::ChannelCongestion;
pub use self::settings::{validate_embedding_settings, Setting, Settings}; pub use self::settings::{validate_embedding_settings, Setting, Settings};