mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-06-16 12:58:46 +02:00
Introduce a new route to export documents and enqueue the export task
This commit is contained in:
parent
c3368e6859
commit
3aab71730a
@ -141,6 +141,12 @@ pub enum KindDump {
|
||||
instance_uid: Option<InstanceUid>,
|
||||
},
|
||||
SnapshotCreation,
|
||||
Export {
|
||||
url: String,
|
||||
indexes: Vec<String>,
|
||||
skip_embeddings: bool,
|
||||
api_key: Option<String>,
|
||||
},
|
||||
UpgradeDatabase {
|
||||
from: (u32, u32, u32),
|
||||
},
|
||||
@ -213,6 +219,14 @@ impl From<KindWithContent> for KindDump {
|
||||
KindDump::DumpCreation { keys, instance_uid }
|
||||
}
|
||||
KindWithContent::SnapshotCreation => KindDump::SnapshotCreation,
|
||||
KindWithContent::Export { url, indexes, skip_embeddings, api_key } => {
|
||||
KindDump::Export {
|
||||
url,
|
||||
indexes: indexes.into_iter().map(|pattern| pattern.to_string()).collect(),
|
||||
skip_embeddings,
|
||||
api_key,
|
||||
}
|
||||
}
|
||||
KindWithContent::UpgradeDatabase { from: version } => {
|
||||
KindDump::UpgradeDatabase { from: version }
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ use std::io;
|
||||
use dump::{KindDump, TaskDump, UpdateFile};
|
||||
use meilisearch_types::batches::{Batch, BatchId};
|
||||
use meilisearch_types::heed::RwTxn;
|
||||
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||
use meilisearch_types::milli;
|
||||
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
||||
use roaring::RoaringBitmap;
|
||||
@ -211,6 +212,19 @@ impl<'a> Dump<'a> {
|
||||
KindWithContent::DumpCreation { keys, instance_uid }
|
||||
}
|
||||
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
|
||||
KindDump::Export { url, indexes, skip_embeddings, api_key } => {
|
||||
KindWithContent::Export {
|
||||
url,
|
||||
indexes: indexes
|
||||
.into_iter()
|
||||
.map(|index| {
|
||||
IndexUidPattern::try_from(index).map_err(|_| Error::CorruptedDump)
|
||||
})
|
||||
.collect::<Result<Vec<_>, Error>>()?,
|
||||
skip_embeddings,
|
||||
api_key,
|
||||
}
|
||||
}
|
||||
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
|
||||
},
|
||||
};
|
||||
|
@ -289,6 +289,9 @@ fn snapshot_details(d: &Details) -> String {
|
||||
Details::IndexSwap { swaps } => {
|
||||
format!("{{ swaps: {swaps:?} }}")
|
||||
}
|
||||
Details::Export { url, api_key, exported_documents, skip_embeddings } => {
|
||||
format!("{{ url: {url:?}, api_key: {api_key:?}, exported_documents: {exported_documents:?}, skip_embeddings: {skip_embeddings:?} }}")
|
||||
}
|
||||
Details::UpgradeDatabase { from, to } => {
|
||||
format!("{{ from: {from:?}, to: {to:?} }}")
|
||||
}
|
||||
|
@ -175,8 +175,17 @@ make_enum_progress! {
|
||||
}
|
||||
}
|
||||
|
||||
make_enum_progress! {
|
||||
pub enum Export {
|
||||
EnsuringCorrectnessOfTheTarget,
|
||||
ExportTheSettings,
|
||||
ExportTheDocuments,
|
||||
}
|
||||
}
|
||||
|
||||
make_atomic_progress!(Task alias AtomicTaskStep => "task" );
|
||||
make_atomic_progress!(Document alias AtomicDocumentStep => "document" );
|
||||
make_atomic_progress!(Index alias AtomicIndexStep => "index" );
|
||||
make_atomic_progress!(Batch alias AtomicBatchStep => "batch" );
|
||||
make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" );
|
||||
|
||||
|
@ -71,6 +71,7 @@ impl From<KindWithContent> for AutobatchKind {
|
||||
KindWithContent::TaskCancelation { .. }
|
||||
| KindWithContent::TaskDeletion { .. }
|
||||
| KindWithContent::DumpCreation { .. }
|
||||
| KindWithContent::Export { .. }
|
||||
| KindWithContent::UpgradeDatabase { .. }
|
||||
| KindWithContent::SnapshotCreation => {
|
||||
panic!("The autobatcher should never be called with tasks that don't apply to an index.")
|
||||
|
@ -47,6 +47,9 @@ pub(crate) enum Batch {
|
||||
IndexSwap {
|
||||
task: Task,
|
||||
},
|
||||
Export {
|
||||
task: Task,
|
||||
},
|
||||
UpgradeDatabase {
|
||||
tasks: Vec<Task>,
|
||||
},
|
||||
@ -103,6 +106,7 @@ impl Batch {
|
||||
Batch::TaskCancelation { task, .. }
|
||||
| Batch::Dump(task)
|
||||
| Batch::IndexCreation { task, .. }
|
||||
| Batch::Export { task }
|
||||
| Batch::IndexUpdate { task, .. } => {
|
||||
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
|
||||
}
|
||||
@ -142,6 +146,7 @@ impl Batch {
|
||||
| TaskDeletions(_)
|
||||
| SnapshotCreation(_)
|
||||
| Dump(_)
|
||||
| Export { .. }
|
||||
| UpgradeDatabase { .. }
|
||||
| IndexSwap { .. } => None,
|
||||
IndexOperation { op, .. } => Some(op.index_uid()),
|
||||
@ -167,6 +172,7 @@ impl fmt::Display for Batch {
|
||||
Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?,
|
||||
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
|
||||
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
|
||||
Batch::Export { .. } => f.write_str("Export")?,
|
||||
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
|
||||
};
|
||||
match index_uid {
|
||||
@ -426,9 +432,10 @@ impl IndexScheduler {
|
||||
/// 0. We get the *last* task to cancel.
|
||||
/// 1. We get the tasks to upgrade.
|
||||
/// 2. We get the *next* task to delete.
|
||||
/// 3. We get the *next* snapshot to process.
|
||||
/// 4. We get the *next* dump to process.
|
||||
/// 5. We get the *next* tasks to process for a specific index.
|
||||
/// 3. We get the *next* export to process.
|
||||
/// 4. We get the *next* snapshot to process.
|
||||
/// 5. We get the *next* dump to process.
|
||||
/// 6. We get the *next* tasks to process for a specific index.
|
||||
#[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")]
|
||||
pub(crate) fn create_next_batch(
|
||||
&self,
|
||||
@ -500,7 +507,17 @@ impl IndexScheduler {
|
||||
return Ok(Some((Batch::TaskDeletions(tasks), current_batch)));
|
||||
}
|
||||
|
||||
// 3. we batch the snapshot.
|
||||
// 3. we batch the export.
|
||||
let to_export = self.queue.tasks.get_kind(rtxn, Kind::Export)? & enqueued;
|
||||
if !to_export.is_empty() {
|
||||
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_export)?;
|
||||
current_batch.processing(&mut tasks);
|
||||
let task = tasks.pop().expect("There must be only one export task");
|
||||
current_batch.reason(BatchStopReason::TaskKindCannotBeBatched { kind: Kind::Export });
|
||||
return Ok(Some((Batch::Export { task }, current_batch)));
|
||||
}
|
||||
|
||||
// 4. we batch the snapshot.
|
||||
let to_snapshot = self.queue.tasks.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued;
|
||||
if !to_snapshot.is_empty() {
|
||||
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?;
|
||||
@ -510,7 +527,7 @@ impl IndexScheduler {
|
||||
return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
|
||||
}
|
||||
|
||||
// 4. we batch the dumps.
|
||||
// 5. we batch the dumps.
|
||||
let to_dump = self.queue.tasks.get_kind(rtxn, Kind::DumpCreation)? & enqueued;
|
||||
if let Some(to_dump) = to_dump.min() {
|
||||
let mut task =
|
||||
@ -523,7 +540,7 @@ impl IndexScheduler {
|
||||
return Ok(Some((Batch::Dump(task), current_batch)));
|
||||
}
|
||||
|
||||
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
|
||||
// 6. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
|
||||
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
|
||||
let mut task =
|
||||
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::panic::{catch_unwind, AssertUnwindSafe};
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||
@ -13,9 +14,9 @@ use roaring::RoaringBitmap;
|
||||
|
||||
use super::create_batch::Batch;
|
||||
use crate::processing::{
|
||||
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep,
|
||||
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
|
||||
UpdateIndexProgress,
|
||||
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, Export,
|
||||
FinalizingIndexStep, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress,
|
||||
TaskDeletionProgress, UpdateIndexProgress,
|
||||
};
|
||||
use crate::utils::{
|
||||
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
||||
@ -361,6 +362,23 @@ impl IndexScheduler {
|
||||
task.status = Status::Succeeded;
|
||||
Ok((vec![task], ProcessBatchInfo::default()))
|
||||
}
|
||||
Batch::Export { mut task } => {
|
||||
progress.update_progress(Export::EnsuringCorrectnessOfTheTarget);
|
||||
|
||||
// TODO send check requests with the API Key
|
||||
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let KindWithContent::Export { url, indexes, skip_embeddings, api_key } = &task.kind
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
eprintln!("Exporting data to {}...", url);
|
||||
std::thread::sleep(Duration::from_secs(30));
|
||||
|
||||
task.status = Status::Succeeded;
|
||||
Ok((vec![task], ProcessBatchInfo::default()))
|
||||
}
|
||||
Batch::UpgradeDatabase { mut tasks } => {
|
||||
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
|
||||
unreachable!();
|
||||
|
@ -273,6 +273,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
|
||||
K::TaskCancelation { .. }
|
||||
| K::TaskDeletion { .. }
|
||||
| K::DumpCreation { .. }
|
||||
| K::Export { .. } // TODO I have patterns, not index uids
|
||||
| K::UpgradeDatabase { .. }
|
||||
| K::SnapshotCreation => (),
|
||||
};
|
||||
@ -600,6 +601,14 @@ impl crate::IndexScheduler {
|
||||
Details::Dump { dump_uid: _ } => {
|
||||
assert_eq!(kind.as_kind(), Kind::DumpCreation);
|
||||
}
|
||||
Details::Export {
|
||||
url: _,
|
||||
api_key: _,
|
||||
exported_documents: _,
|
||||
skip_embeddings: _,
|
||||
} => {
|
||||
assert_eq!(kind.as_kind(), Kind::Export);
|
||||
}
|
||||
Details::UpgradeDatabase { from: _, to: _ } => {
|
||||
assert_eq!(kind.as_kind(), Kind::UpgradeDatabase);
|
||||
}
|
||||
|
@ -389,6 +389,11 @@ InvalidDocumentEditionContext , InvalidRequest , BAD_REQU
|
||||
InvalidDocumentEditionFunctionFilter , InvalidRequest , BAD_REQUEST ;
|
||||
EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidSettingsIndexChat , InvalidRequest , BAD_REQUEST ;
|
||||
// Export
|
||||
InvalidExportUrl , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidExportApiKey , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidExportIndexesPatterns , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidExportSkipEmbeddings , InvalidRequest , BAD_REQUEST ;
|
||||
// Experimental features - Chat Completions
|
||||
UnimplementedExternalFunctionCalling , InvalidRequest , NOT_IMPLEMENTED ;
|
||||
UnimplementedNonStreamingChatCompletions , InvalidRequest , NOT_IMPLEMENTED ;
|
||||
|
@ -317,6 +317,9 @@ pub enum Action {
|
||||
#[serde(rename = "experimental.update")]
|
||||
#[deserr(rename = "experimental.update")]
|
||||
ExperimentalFeaturesUpdate,
|
||||
#[serde(rename = "export")]
|
||||
#[deserr(rename = "export")]
|
||||
Export,
|
||||
#[serde(rename = "network.get")]
|
||||
#[deserr(rename = "network.get")]
|
||||
NetworkGet,
|
||||
@ -438,6 +441,8 @@ pub mod actions {
|
||||
pub const EXPERIMENTAL_FEATURES_GET: u8 = ExperimentalFeaturesGet.repr();
|
||||
pub const EXPERIMENTAL_FEATURES_UPDATE: u8 = ExperimentalFeaturesUpdate.repr();
|
||||
|
||||
pub const EXPORT: u8 = Export.repr();
|
||||
|
||||
pub const NETWORK_GET: u8 = NetworkGet.repr();
|
||||
pub const NETWORK_UPDATE: u8 = NetworkUpdate.repr();
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use milli::Object;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::{Duration, OffsetDateTime};
|
||||
@ -118,6 +120,15 @@ pub struct DetailsView {
|
||||
pub upgrade_from: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub upgrade_to: Option<String>,
|
||||
// exporting
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub url: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub api_key: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub exported_documents: Option<BTreeMap<String, u32>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub skip_embeddings: Option<bool>,
|
||||
}
|
||||
|
||||
impl DetailsView {
|
||||
@ -238,6 +249,37 @@ impl DetailsView {
|
||||
Some(left)
|
||||
}
|
||||
},
|
||||
url: match (self.url.clone(), other.url.clone()) {
|
||||
(None, None) => None,
|
||||
(None, Some(url)) | (Some(url), None) => Some(url),
|
||||
// We should never be able to batch multiple exports at the same time.
|
||||
// So we return the first one we encounter but that shouldn't be an issue anyway.
|
||||
(Some(left), Some(_right)) => Some(left),
|
||||
},
|
||||
api_key: match (self.api_key.clone(), other.api_key.clone()) {
|
||||
(None, None) => None,
|
||||
(None, Some(key)) | (Some(key), None) => Some(key),
|
||||
// We should never be able to batch multiple exports at the same time.
|
||||
// So we return the first one we encounter but that shouldn't be an issue anyway.
|
||||
(Some(left), Some(_right)) => Some(left),
|
||||
},
|
||||
exported_documents: match (
|
||||
self.exported_documents.clone(),
|
||||
other.exported_documents.clone(),
|
||||
) {
|
||||
(None, None) => None,
|
||||
(None, Some(exp)) | (Some(exp), None) => Some(exp),
|
||||
// We should never be able to batch multiple exports at the same time.
|
||||
// So we return the first one we encounter but that shouldn't be an issue anyway.
|
||||
(Some(left), Some(_right)) => Some(left),
|
||||
},
|
||||
skip_embeddings: match (self.skip_embeddings, other.skip_embeddings) {
|
||||
(None, None) => None,
|
||||
(None, Some(skip)) | (Some(skip), None) => Some(skip),
|
||||
// We should never be able to batch multiple exports at the same time.
|
||||
// So we return the first one we encounter but that shouldn't be an issue anyway.
|
||||
(Some(left), Some(_right)) => Some(left),
|
||||
},
|
||||
// We want the earliest version
|
||||
upgrade_from: match (self.upgrade_from.clone(), other.upgrade_from.clone()) {
|
||||
(None, None) => None,
|
||||
@ -327,6 +369,9 @@ impl From<Details> for DetailsView {
|
||||
Details::IndexSwap { swaps } => {
|
||||
DetailsView { swaps: Some(swaps), ..Default::default() }
|
||||
}
|
||||
Details::Export { url, api_key, exported_documents, skip_embeddings } => {
|
||||
DetailsView { exported_documents: Some(exported_documents), ..Default::default() }
|
||||
}
|
||||
Details::UpgradeDatabase { from, to } => DetailsView {
|
||||
upgrade_from: Some(format!("v{}.{}.{}", from.0, from.1, from.2)),
|
||||
upgrade_to: Some(format!("v{}.{}.{}", to.0, to.1, to.2)),
|
||||
|
@ -1,5 +1,5 @@
|
||||
use core::fmt;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::fmt::{Display, Write};
|
||||
use std::str::FromStr;
|
||||
|
||||
@ -14,6 +14,7 @@ use uuid::Uuid;
|
||||
|
||||
use crate::batches::BatchId;
|
||||
use crate::error::ResponseError;
|
||||
use crate::index_uid_pattern::IndexUidPattern;
|
||||
use crate::keys::Key;
|
||||
use crate::settings::{Settings, Unchecked};
|
||||
use crate::{versioning, InstanceUid};
|
||||
@ -50,6 +51,7 @@ impl Task {
|
||||
| SnapshotCreation
|
||||
| TaskCancelation { .. }
|
||||
| TaskDeletion { .. }
|
||||
| Export { .. }
|
||||
| UpgradeDatabase { .. }
|
||||
| IndexSwap { .. } => None,
|
||||
DocumentAdditionOrUpdate { index_uid, .. }
|
||||
@ -86,6 +88,7 @@ impl Task {
|
||||
| KindWithContent::TaskDeletion { .. }
|
||||
| KindWithContent::DumpCreation { .. }
|
||||
| KindWithContent::SnapshotCreation
|
||||
| KindWithContent::Export { .. }
|
||||
| KindWithContent::UpgradeDatabase { .. } => None,
|
||||
}
|
||||
}
|
||||
@ -152,6 +155,12 @@ pub enum KindWithContent {
|
||||
instance_uid: Option<InstanceUid>,
|
||||
},
|
||||
SnapshotCreation,
|
||||
Export {
|
||||
url: String,
|
||||
api_key: Option<String>,
|
||||
indexes: Vec<IndexUidPattern>,
|
||||
skip_embeddings: bool,
|
||||
},
|
||||
UpgradeDatabase {
|
||||
from: (u32, u32, u32),
|
||||
},
|
||||
@ -180,6 +189,7 @@ impl KindWithContent {
|
||||
KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion,
|
||||
KindWithContent::DumpCreation { .. } => Kind::DumpCreation,
|
||||
KindWithContent::SnapshotCreation => Kind::SnapshotCreation,
|
||||
KindWithContent::Export { .. } => Kind::Export,
|
||||
KindWithContent::UpgradeDatabase { .. } => Kind::UpgradeDatabase,
|
||||
}
|
||||
}
|
||||
@ -192,6 +202,7 @@ impl KindWithContent {
|
||||
| SnapshotCreation
|
||||
| TaskCancelation { .. }
|
||||
| TaskDeletion { .. }
|
||||
| Export { .. } // TODO Should I resolve the index names?
|
||||
| UpgradeDatabase { .. } => vec![],
|
||||
DocumentAdditionOrUpdate { index_uid, .. }
|
||||
| DocumentEdition { index_uid, .. }
|
||||
@ -269,6 +280,14 @@ impl KindWithContent {
|
||||
}),
|
||||
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
||||
KindWithContent::SnapshotCreation => None,
|
||||
KindWithContent::Export { url, api_key, indexes: _, skip_embeddings } => {
|
||||
Some(Details::Export {
|
||||
url: url.clone(),
|
||||
api_key: api_key.clone(),
|
||||
exported_documents: Default::default(),
|
||||
skip_embeddings: *skip_embeddings,
|
||||
})
|
||||
}
|
||||
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
|
||||
from: (from.0, from.1, from.2),
|
||||
to: (
|
||||
@ -335,6 +354,14 @@ impl KindWithContent {
|
||||
}),
|
||||
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
||||
KindWithContent::SnapshotCreation => None,
|
||||
KindWithContent::Export { url, api_key, indexes: _, skip_embeddings } => {
|
||||
Some(Details::Export {
|
||||
url: url.clone(),
|
||||
api_key: api_key.clone(),
|
||||
exported_documents: Default::default(),
|
||||
skip_embeddings: skip_embeddings.clone(),
|
||||
})
|
||||
}
|
||||
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
|
||||
from: *from,
|
||||
to: (
|
||||
@ -383,6 +410,14 @@ impl From<&KindWithContent> for Option<Details> {
|
||||
}),
|
||||
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
||||
KindWithContent::SnapshotCreation => None,
|
||||
KindWithContent::Export { url, api_key, indexes: _, skip_embeddings } => {
|
||||
Some(Details::Export {
|
||||
url: url.clone(),
|
||||
api_key: api_key.clone(),
|
||||
exported_documents: BTreeMap::default(),
|
||||
skip_embeddings: skip_embeddings.clone(),
|
||||
})
|
||||
}
|
||||
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
|
||||
from: *from,
|
||||
to: (
|
||||
@ -499,6 +534,7 @@ pub enum Kind {
|
||||
TaskDeletion,
|
||||
DumpCreation,
|
||||
SnapshotCreation,
|
||||
Export,
|
||||
UpgradeDatabase,
|
||||
}
|
||||
|
||||
@ -516,6 +552,7 @@ impl Kind {
|
||||
| Kind::TaskCancelation
|
||||
| Kind::TaskDeletion
|
||||
| Kind::DumpCreation
|
||||
| Kind::Export
|
||||
| Kind::UpgradeDatabase
|
||||
| Kind::SnapshotCreation => false,
|
||||
}
|
||||
@ -536,6 +573,7 @@ impl Display for Kind {
|
||||
Kind::TaskDeletion => write!(f, "taskDeletion"),
|
||||
Kind::DumpCreation => write!(f, "dumpCreation"),
|
||||
Kind::SnapshotCreation => write!(f, "snapshotCreation"),
|
||||
Kind::Export => write!(f, "export"),
|
||||
Kind::UpgradeDatabase => write!(f, "upgradeDatabase"),
|
||||
}
|
||||
}
|
||||
@ -643,6 +681,12 @@ pub enum Details {
|
||||
IndexSwap {
|
||||
swaps: Vec<IndexSwap>,
|
||||
},
|
||||
Export {
|
||||
url: String,
|
||||
api_key: Option<String>,
|
||||
exported_documents: BTreeMap<String, u32>,
|
||||
skip_embeddings: bool,
|
||||
},
|
||||
UpgradeDatabase {
|
||||
from: (u32, u32, u32),
|
||||
to: (u32, u32, u32),
|
||||
@ -667,6 +711,7 @@ impl Details {
|
||||
Self::SettingsUpdate { .. }
|
||||
| Self::IndexInfo { .. }
|
||||
| Self::Dump { .. }
|
||||
| Self::Export { .. }
|
||||
| Self::UpgradeDatabase { .. }
|
||||
| Self::IndexSwap { .. } => (),
|
||||
}
|
||||
|
105
crates/meilisearch/src/routes/export.rs
Normal file
105
crates/meilisearch/src/routes/export.rs
Normal file
@ -0,0 +1,105 @@
|
||||
use actix_web::web::{self, Data};
|
||||
use actix_web::{HttpRequest, HttpResponse};
|
||||
use deserr::actix_web::AwebJson;
|
||||
use deserr::Deserr;
|
||||
use index_scheduler::IndexScheduler;
|
||||
use meilisearch_types::deserr::DeserrJsonError;
|
||||
use meilisearch_types::error::deserr_codes::*;
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||
use meilisearch_types::keys::actions;
|
||||
use meilisearch_types::tasks::KindWithContent;
|
||||
use serde::Serialize;
|
||||
use tracing::debug;
|
||||
use utoipa::{OpenApi, ToSchema};
|
||||
|
||||
use crate::analytics::Analytics;
|
||||
use crate::extractors::authentication::policies::ActionPolicy;
|
||||
use crate::extractors::authentication::GuardedData;
|
||||
use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
|
||||
use crate::Opt;
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
paths(export),
|
||||
tags((
|
||||
name = "Export",
|
||||
description = "The `/export` route allows you to trigger an export process to a remote Meilisearch instance.",
|
||||
external_docs(url = "https://www.meilisearch.com/docs/reference/api/export"),
|
||||
)),
|
||||
)]
|
||||
pub struct ExportApi;
|
||||
|
||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(web::resource("").route(web::post().to(export)));
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "",
|
||||
tag = "Export",
|
||||
security(("Bearer" = ["export", "*"])),
|
||||
responses(
|
||||
(status = OK, description = "Known nodes are returned", body = Export, content_type = "application/json", example = json!(
|
||||
{
|
||||
"indexes": ["movie", "steam-*"],
|
||||
"skip_embeddings": true,
|
||||
"apiKey": "meilisearch-api-key"
|
||||
})),
|
||||
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
|
||||
{
|
||||
"message": "The Authorization header is missing. It must use the bearer authorization method.",
|
||||
"code": "missing_authorization_header",
|
||||
"type": "auth",
|
||||
"link": "https://docs.meilisearch.com/errors#missing_authorization_header"
|
||||
}
|
||||
)),
|
||||
)
|
||||
)]
|
||||
async fn export(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::EXPORT }>, Data<IndexScheduler>>,
|
||||
export: AwebJson<Export, DeserrJsonError>,
|
||||
req: HttpRequest,
|
||||
opt: web::Data<Opt>,
|
||||
_analytics: Data<Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
// TODO make it experimental?
|
||||
// index_scheduler.features().check_network("Using the /network route")?;
|
||||
|
||||
let export = export.into_inner();
|
||||
debug!(returns = ?export, "Trigger export");
|
||||
|
||||
let Export { url, api_key, indexes, skip_embeddings } = export;
|
||||
let task = KindWithContent::Export { url, api_key, indexes, skip_embeddings };
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
|
||||
Ok(HttpResponse::Ok().json(task))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserr, ToSchema, Serialize)]
|
||||
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[schema(rename_all = "camelCase")]
|
||||
pub struct Export {
|
||||
#[schema(value_type = Option<String>, example = json!("https://ms-1234.heaven.meilisearch.com"))]
|
||||
#[serde(default)]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidExportUrl>)]
|
||||
pub url: String,
|
||||
#[schema(value_type = Option<String>, example = json!("1234abcd"))]
|
||||
#[serde(default)]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidExportApiKey>)]
|
||||
pub api_key: Option<String>,
|
||||
#[schema(value_type = Option<BTreeSet<String>>, example = json!(["movies", "steam-*"]))]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidExportIndexesPatterns>)]
|
||||
#[serde(default)]
|
||||
pub indexes: Vec<IndexUidPattern>,
|
||||
#[schema(value_type = Option<bool>, example = json!("true"))]
|
||||
#[serde(default)]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidExportSkipEmbeddings>)]
|
||||
pub skip_embeddings: bool,
|
||||
}
|
@ -54,6 +54,7 @@ mod api_key;
|
||||
pub mod batches;
|
||||
pub mod chats;
|
||||
mod dump;
|
||||
mod export;
|
||||
pub mod features;
|
||||
pub mod indexes;
|
||||
mod logs;
|
||||
@ -84,6 +85,7 @@ mod tasks_test;
|
||||
(path = "/multi-search", api = multi_search::MultiSearchApi),
|
||||
(path = "/swap-indexes", api = swap_indexes::SwapIndexesApi),
|
||||
(path = "/experimental-features", api = features::ExperimentalFeaturesApi),
|
||||
(path = "/export", api = export::ExportApi),
|
||||
(path = "/network", api = network::NetworkApi),
|
||||
),
|
||||
paths(get_health, get_version, get_stats),
|
||||
@ -115,6 +117,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
.service(web::scope("/metrics").configure(metrics::configure))
|
||||
.service(web::scope("/experimental-features").configure(features::configure))
|
||||
.service(web::scope("/network").configure(network::configure))
|
||||
.service(web::scope("/export").configure(export::configure))
|
||||
.service(web::scope("/chats").configure(chats::configure));
|
||||
|
||||
#[cfg(feature = "swagger")]
|
||||
|
Loading…
x
Reference in New Issue
Block a user