mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-06-16 21:08:45 +02:00
Working first implementation
This commit is contained in:
parent
3aab71730a
commit
23e25a437c
@ -1,12 +1,16 @@
|
|||||||
#![allow(clippy::type_complexity)]
|
#![allow(clippy::type_complexity)]
|
||||||
#![allow(clippy::wrong_self_convention)]
|
#![allow(clippy::wrong_self_convention)]
|
||||||
|
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use meilisearch_types::batches::BatchId;
|
use meilisearch_types::batches::BatchId;
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::ResponseError;
|
||||||
use meilisearch_types::keys::Key;
|
use meilisearch_types::keys::Key;
|
||||||
use meilisearch_types::milli::update::IndexDocumentsMethod;
|
use meilisearch_types::milli::update::IndexDocumentsMethod;
|
||||||
use meilisearch_types::settings::Unchecked;
|
use meilisearch_types::settings::Unchecked;
|
||||||
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task, TaskId};
|
use meilisearch_types::tasks::{
|
||||||
|
Details, ExportIndexSettings, IndexSwap, KindWithContent, Status, Task, TaskId,
|
||||||
|
};
|
||||||
use meilisearch_types::InstanceUid;
|
use meilisearch_types::InstanceUid;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -143,9 +147,8 @@ pub enum KindDump {
|
|||||||
SnapshotCreation,
|
SnapshotCreation,
|
||||||
Export {
|
Export {
|
||||||
url: String,
|
url: String,
|
||||||
indexes: Vec<String>,
|
|
||||||
skip_embeddings: bool,
|
|
||||||
api_key: Option<String>,
|
api_key: Option<String>,
|
||||||
|
indexes: BTreeMap<String, ExportIndexSettings>,
|
||||||
},
|
},
|
||||||
UpgradeDatabase {
|
UpgradeDatabase {
|
||||||
from: (u32, u32, u32),
|
from: (u32, u32, u32),
|
||||||
@ -219,14 +222,14 @@ impl From<KindWithContent> for KindDump {
|
|||||||
KindDump::DumpCreation { keys, instance_uid }
|
KindDump::DumpCreation { keys, instance_uid }
|
||||||
}
|
}
|
||||||
KindWithContent::SnapshotCreation => KindDump::SnapshotCreation,
|
KindWithContent::SnapshotCreation => KindDump::SnapshotCreation,
|
||||||
KindWithContent::Export { url, indexes, skip_embeddings, api_key } => {
|
KindWithContent::Export { url, api_key, indexes } => KindDump::Export {
|
||||||
KindDump::Export {
|
url,
|
||||||
url,
|
api_key,
|
||||||
indexes: indexes.into_iter().map(|pattern| pattern.to_string()).collect(),
|
indexes: indexes
|
||||||
skip_embeddings,
|
.into_iter()
|
||||||
api_key,
|
.map(|(pattern, settings)| (pattern.to_string(), settings))
|
||||||
}
|
.collect(),
|
||||||
}
|
},
|
||||||
KindWithContent::UpgradeDatabase { from: version } => {
|
KindWithContent::UpgradeDatabase { from: version } => {
|
||||||
KindDump::UpgradeDatabase { from: version }
|
KindDump::UpgradeDatabase { from: version }
|
||||||
}
|
}
|
||||||
|
@ -212,19 +212,20 @@ impl<'a> Dump<'a> {
|
|||||||
KindWithContent::DumpCreation { keys, instance_uid }
|
KindWithContent::DumpCreation { keys, instance_uid }
|
||||||
}
|
}
|
||||||
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
|
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
|
||||||
KindDump::Export { url, indexes, skip_embeddings, api_key } => {
|
KindDump::Export { url, indexes, api_key } => KindWithContent::Export {
|
||||||
KindWithContent::Export {
|
url,
|
||||||
url,
|
api_key,
|
||||||
indexes: indexes
|
indexes: indexes
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|index| {
|
.map(|(pattern, settings)| {
|
||||||
IndexUidPattern::try_from(index).map_err(|_| Error::CorruptedDump)
|
Ok((
|
||||||
})
|
IndexUidPattern::try_from(pattern)
|
||||||
.collect::<Result<Vec<_>, Error>>()?,
|
.map_err(|_| Error::CorruptedDump)?,
|
||||||
skip_embeddings,
|
settings,
|
||||||
api_key,
|
))
|
||||||
}
|
})
|
||||||
}
|
.collect::<Result<_, Error>>()?,
|
||||||
|
},
|
||||||
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
|
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
@ -151,6 +151,8 @@ pub enum Error {
|
|||||||
CorruptedTaskQueue,
|
CorruptedTaskQueue,
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
DatabaseUpgrade(Box<Self>),
|
DatabaseUpgrade(Box<Self>),
|
||||||
|
#[error(transparent)]
|
||||||
|
Export(Box<Self>),
|
||||||
#[error("Failed to rollback for index `{index}`: {rollback_outcome} ")]
|
#[error("Failed to rollback for index `{index}`: {rollback_outcome} ")]
|
||||||
RollbackFailed { index: String, rollback_outcome: RollbackOutcome },
|
RollbackFailed { index: String, rollback_outcome: RollbackOutcome },
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
@ -221,6 +223,7 @@ impl Error {
|
|||||||
| Error::IoError(_)
|
| Error::IoError(_)
|
||||||
| Error::Persist(_)
|
| Error::Persist(_)
|
||||||
| Error::FeatureNotEnabled(_)
|
| Error::FeatureNotEnabled(_)
|
||||||
|
| Error::Export(_)
|
||||||
| Error::Anyhow(_) => true,
|
| Error::Anyhow(_) => true,
|
||||||
Error::CreateBatch(_)
|
Error::CreateBatch(_)
|
||||||
| Error::CorruptedTaskQueue
|
| Error::CorruptedTaskQueue
|
||||||
@ -294,6 +297,7 @@ impl ErrorCode for Error {
|
|||||||
Error::CorruptedTaskQueue => Code::Internal,
|
Error::CorruptedTaskQueue => Code::Internal,
|
||||||
Error::CorruptedDump => Code::Internal,
|
Error::CorruptedDump => Code::Internal,
|
||||||
Error::DatabaseUpgrade(_) => Code::Internal,
|
Error::DatabaseUpgrade(_) => Code::Internal,
|
||||||
|
Error::Export(_) => Code::Internal,
|
||||||
Error::RollbackFailed { .. } => Code::Internal,
|
Error::RollbackFailed { .. } => Code::Internal,
|
||||||
Error::UnrecoverableError(_) => Code::Internal,
|
Error::UnrecoverableError(_) => Code::Internal,
|
||||||
Error::IndexSchedulerVersionMismatch { .. } => Code::Internal,
|
Error::IndexSchedulerVersionMismatch { .. } => Code::Internal,
|
||||||
|
@ -289,8 +289,8 @@ fn snapshot_details(d: &Details) -> String {
|
|||||||
Details::IndexSwap { swaps } => {
|
Details::IndexSwap { swaps } => {
|
||||||
format!("{{ swaps: {swaps:?} }}")
|
format!("{{ swaps: {swaps:?} }}")
|
||||||
}
|
}
|
||||||
Details::Export { url, api_key, exported_documents, skip_embeddings } => {
|
Details::Export { url, api_key, indexes } => {
|
||||||
format!("{{ url: {url:?}, api_key: {api_key:?}, exported_documents: {exported_documents:?}, skip_embeddings: {skip_embeddings:?} }}")
|
format!("{{ url: {url:?}, api_key: {api_key:?}, indexes: {indexes:?} }}")
|
||||||
}
|
}
|
||||||
Details::UpgradeDatabase { from, to } => {
|
Details::UpgradeDatabase { from, to } => {
|
||||||
format!("{{ from: {from:?}, to: {to:?} }}")
|
format!("{{ from: {from:?}, to: {to:?} }}")
|
||||||
|
@ -4,6 +4,7 @@ mod autobatcher_test;
|
|||||||
mod create_batch;
|
mod create_batch;
|
||||||
mod process_batch;
|
mod process_batch;
|
||||||
mod process_dump_creation;
|
mod process_dump_creation;
|
||||||
|
mod process_export;
|
||||||
mod process_index_operation;
|
mod process_index_operation;
|
||||||
mod process_snapshot_creation;
|
mod process_snapshot_creation;
|
||||||
mod process_upgrade;
|
mod process_upgrade;
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||||
use std::panic::{catch_unwind, AssertUnwindSafe};
|
use std::panic::{catch_unwind, AssertUnwindSafe};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
||||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||||
@ -14,9 +13,9 @@ use roaring::RoaringBitmap;
|
|||||||
|
|
||||||
use super::create_batch::Batch;
|
use super::create_batch::Batch;
|
||||||
use crate::processing::{
|
use crate::processing::{
|
||||||
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, Export,
|
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep,
|
||||||
FinalizingIndexStep, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress,
|
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
|
||||||
TaskDeletionProgress, UpdateIndexProgress,
|
UpdateIndexProgress,
|
||||||
};
|
};
|
||||||
use crate::utils::{
|
use crate::utils::{
|
||||||
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
||||||
@ -363,18 +362,32 @@ impl IndexScheduler {
|
|||||||
Ok((vec![task], ProcessBatchInfo::default()))
|
Ok((vec![task], ProcessBatchInfo::default()))
|
||||||
}
|
}
|
||||||
Batch::Export { mut task } => {
|
Batch::Export { mut task } => {
|
||||||
progress.update_progress(Export::EnsuringCorrectnessOfTheTarget);
|
let KindWithContent::Export { url, indexes, api_key } = &task.kind else {
|
||||||
|
|
||||||
// TODO send check requests with the API Key
|
|
||||||
|
|
||||||
let mut wtxn = self.env.write_txn()?;
|
|
||||||
let KindWithContent::Export { url, indexes, skip_embeddings, api_key } = &task.kind
|
|
||||||
else {
|
|
||||||
unreachable!()
|
unreachable!()
|
||||||
};
|
};
|
||||||
|
|
||||||
eprintln!("Exporting data to {}...", url);
|
let ret = catch_unwind(AssertUnwindSafe(|| {
|
||||||
std::thread::sleep(Duration::from_secs(30));
|
self.process_export(url, indexes, api_key.as_deref(), progress)
|
||||||
|
}));
|
||||||
|
|
||||||
|
match ret {
|
||||||
|
// TODO return the matched and exported documents
|
||||||
|
Ok(Ok(())) => (),
|
||||||
|
Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask),
|
||||||
|
Ok(Err(e)) => return Err(Error::Export(Box::new(e))),
|
||||||
|
Err(e) => {
|
||||||
|
let msg = match e.downcast_ref::<&'static str>() {
|
||||||
|
Some(s) => *s,
|
||||||
|
None => match e.downcast_ref::<String>() {
|
||||||
|
Some(s) => &s[..],
|
||||||
|
None => "Box<dyn Any>",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
return Err(Error::Export(Box::new(Error::ProcessBatchPanicked(
|
||||||
|
msg.to_string(),
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
task.status = Status::Succeeded;
|
task.status = Status::Succeeded;
|
||||||
Ok((vec![task], ProcessBatchInfo::default()))
|
Ok((vec![task], ProcessBatchInfo::default()))
|
||||||
@ -726,9 +739,11 @@ impl IndexScheduler {
|
|||||||
from.1,
|
from.1,
|
||||||
from.2
|
from.2
|
||||||
);
|
);
|
||||||
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
let ret = catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||||
self.process_rollback(from, progress)
|
self.process_rollback(from, progress)
|
||||||
})) {
|
}));
|
||||||
|
|
||||||
|
match ret {
|
||||||
Ok(Ok(())) => {}
|
Ok(Ok(())) => {}
|
||||||
Ok(Err(err)) => return Err(Error::DatabaseUpgrade(Box::new(err))),
|
Ok(Err(err)) => return Err(Error::DatabaseUpgrade(Box::new(err))),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
141
crates/index-scheduler/src/scheduler/process_export.rs
Normal file
141
crates/index-scheduler/src/scheduler/process_export.rs
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||||
|
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||||
|
use meilisearch_types::milli::{obkv_to_json, Filter};
|
||||||
|
use meilisearch_types::settings::{self, SecretPolicy};
|
||||||
|
use meilisearch_types::tasks::ExportIndexSettings;
|
||||||
|
use ureq::{json, Agent};
|
||||||
|
|
||||||
|
use crate::{Error, IndexScheduler, Result};
|
||||||
|
|
||||||
|
impl IndexScheduler {
|
||||||
|
pub(super) fn process_export(
|
||||||
|
&self,
|
||||||
|
url: &str,
|
||||||
|
indexes: &BTreeMap<IndexUidPattern, ExportIndexSettings>,
|
||||||
|
api_key: Option<&str>,
|
||||||
|
progress: Progress,
|
||||||
|
) -> Result<()> {
|
||||||
|
#[cfg(test)]
|
||||||
|
self.maybe_fail(crate::test_utils::FailureLocation::ProcessExport)?;
|
||||||
|
|
||||||
|
let indexes: Vec<_> = self
|
||||||
|
.index_names()?
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|uid| {
|
||||||
|
indexes
|
||||||
|
.iter()
|
||||||
|
.find(|(pattern, _)| pattern.matches_str(&uid))
|
||||||
|
.map(|(_pattern, settings)| (uid, settings))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let agent: Agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build();
|
||||||
|
|
||||||
|
for (i, (uid, settings)) in indexes.iter().enumerate() {
|
||||||
|
let must_stop_processing = self.scheduler.must_stop_processing.clone();
|
||||||
|
if must_stop_processing.get() {
|
||||||
|
return Err(Error::AbortedTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
progress.update_progress(VariableNameStep::<ExportIndex>::new(
|
||||||
|
format!("Exporting index `{uid}`"),
|
||||||
|
i as u32,
|
||||||
|
indexes.len() as u32,
|
||||||
|
));
|
||||||
|
|
||||||
|
let ExportIndexSettings { skip_embeddings, filter } = settings;
|
||||||
|
let index = self.index(uid)?;
|
||||||
|
let index_rtxn = index.read_txn()?;
|
||||||
|
|
||||||
|
// Send the primary key
|
||||||
|
let primary_key = index.primary_key(&index_rtxn).unwrap();
|
||||||
|
// TODO implement retry logic
|
||||||
|
let mut request = agent.post(&format!("{url}/indexes"));
|
||||||
|
if let Some(api_key) = api_key {
|
||||||
|
request = request.set("Authorization", &format!("Bearer {api_key}"));
|
||||||
|
}
|
||||||
|
request.send_json(&json!({ "uid": uid, "primaryKey": primary_key })).unwrap();
|
||||||
|
|
||||||
|
// Send the index settings
|
||||||
|
let settings = settings::settings(&index, &index_rtxn, SecretPolicy::RevealSecrets)
|
||||||
|
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||||
|
// TODO implement retry logic
|
||||||
|
// improve error reporting (get error message)
|
||||||
|
let mut request = agent.patch(&format!("{url}/indexes/{uid}/settings"));
|
||||||
|
if let Some(api_key) = api_key {
|
||||||
|
request = request.set("Authorization", &format!("Bearer {api_key}"));
|
||||||
|
}
|
||||||
|
request.send_json(settings).unwrap();
|
||||||
|
|
||||||
|
let filter = filter
|
||||||
|
.as_deref()
|
||||||
|
.map(Filter::from_str)
|
||||||
|
.transpose()
|
||||||
|
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?
|
||||||
|
.flatten();
|
||||||
|
|
||||||
|
let filter_universe = filter
|
||||||
|
.map(|f| f.evaluate(&index_rtxn, &index))
|
||||||
|
.transpose()
|
||||||
|
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||||
|
let whole_universe = index
|
||||||
|
.documents_ids(&index_rtxn)
|
||||||
|
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
|
||||||
|
let universe = filter_universe.unwrap_or(whole_universe);
|
||||||
|
|
||||||
|
let fields_ids_map = index.fields_ids_map(&index_rtxn)?;
|
||||||
|
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||||
|
let embedding_configs = index
|
||||||
|
.embedding_configs(&index_rtxn)
|
||||||
|
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
|
||||||
|
|
||||||
|
let limit = 50 * 1024 * 1024; // 50 MiB
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
let mut tmp_buffer = Vec::new();
|
||||||
|
for docid in universe {
|
||||||
|
let document = index
|
||||||
|
.document(&index_rtxn, docid)
|
||||||
|
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||||
|
|
||||||
|
let value = obkv_to_json(&all_fields, &fields_ids_map, document)
|
||||||
|
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||||
|
|
||||||
|
tmp_buffer.clear();
|
||||||
|
serde_json::to_writer(&mut tmp_buffer, &value)
|
||||||
|
.map_err(meilisearch_types::milli::InternalError::from)
|
||||||
|
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
|
||||||
|
|
||||||
|
if buffer.len() + tmp_buffer.len() > limit {
|
||||||
|
// TODO implement retry logic
|
||||||
|
post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap();
|
||||||
|
buffer.clear();
|
||||||
|
}
|
||||||
|
buffer.extend_from_slice(&tmp_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn post_serialized_documents(
|
||||||
|
agent: &Agent,
|
||||||
|
url: &str,
|
||||||
|
uid: &str,
|
||||||
|
api_key: Option<&str>,
|
||||||
|
buffer: &[u8],
|
||||||
|
) -> Result<ureq::Response, ureq::Error> {
|
||||||
|
let mut request = agent.post(&format!("{url}/indexes/{uid}/documents"));
|
||||||
|
request = request.set("Content-Type", "application/x-ndjson");
|
||||||
|
if let Some(api_key) = api_key {
|
||||||
|
request = request.set("Authorization", &format!("Bearer {api_key}"));
|
||||||
|
}
|
||||||
|
request.send_bytes(buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ExportIndex {}
|
@ -37,6 +37,7 @@ pub(crate) enum FailureLocation {
|
|||||||
InsideCreateBatch,
|
InsideCreateBatch,
|
||||||
InsideProcessBatch,
|
InsideProcessBatch,
|
||||||
PanicInsideProcessBatch,
|
PanicInsideProcessBatch,
|
||||||
|
ProcessExport,
|
||||||
ProcessUpgrade,
|
ProcessUpgrade,
|
||||||
AcquiringWtxn,
|
AcquiringWtxn,
|
||||||
UpdatingTaskAfterProcessBatchSuccess { task_uid: u32 },
|
UpdatingTaskAfterProcessBatchSuccess { task_uid: u32 },
|
||||||
|
@ -601,12 +601,7 @@ impl crate::IndexScheduler {
|
|||||||
Details::Dump { dump_uid: _ } => {
|
Details::Dump { dump_uid: _ } => {
|
||||||
assert_eq!(kind.as_kind(), Kind::DumpCreation);
|
assert_eq!(kind.as_kind(), Kind::DumpCreation);
|
||||||
}
|
}
|
||||||
Details::Export {
|
Details::Export { url: _, api_key: _, indexes: _ } => {
|
||||||
url: _,
|
|
||||||
api_key: _,
|
|
||||||
exported_documents: _,
|
|
||||||
skip_embeddings: _,
|
|
||||||
} => {
|
|
||||||
assert_eq!(kind.as_kind(), Kind::Export);
|
assert_eq!(kind.as_kind(), Kind::Export);
|
||||||
}
|
}
|
||||||
Details::UpgradeDatabase { from: _, to: _ } => {
|
Details::UpgradeDatabase { from: _, to: _ } => {
|
||||||
|
@ -393,7 +393,8 @@ InvalidSettingsIndexChat , InvalidRequest , BAD_REQU
|
|||||||
InvalidExportUrl , InvalidRequest , BAD_REQUEST ;
|
InvalidExportUrl , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidExportApiKey , InvalidRequest , BAD_REQUEST ;
|
InvalidExportApiKey , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidExportIndexesPatterns , InvalidRequest , BAD_REQUEST ;
|
InvalidExportIndexesPatterns , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidExportSkipEmbeddings , InvalidRequest , BAD_REQUEST ;
|
InvalidExportIndexSkipEmbeddings , InvalidRequest , BAD_REQUEST ;
|
||||||
|
InvalidExportIndexFilter , InvalidRequest , BAD_REQUEST ;
|
||||||
// Experimental features - Chat Completions
|
// Experimental features - Chat Completions
|
||||||
UnimplementedExternalFunctionCalling , InvalidRequest , NOT_IMPLEMENTED ;
|
UnimplementedExternalFunctionCalling , InvalidRequest , NOT_IMPLEMENTED ;
|
||||||
UnimplementedNonStreamingChatCompletions , InvalidRequest , NOT_IMPLEMENTED ;
|
UnimplementedNonStreamingChatCompletions , InvalidRequest , NOT_IMPLEMENTED ;
|
||||||
|
@ -12,7 +12,7 @@ use crate::index_uid::{IndexUid, IndexUidFormatError};
|
|||||||
|
|
||||||
/// An index uid pattern is composed of only ascii alphanumeric characters, - and _, between 1 and 400
|
/// An index uid pattern is composed of only ascii alphanumeric characters, - and _, between 1 and 400
|
||||||
/// bytes long and optionally ending with a *.
|
/// bytes long and optionally ending with a *.
|
||||||
#[derive(Serialize, Deserialize, Deserr, Debug, Clone, PartialEq, Eq, Hash)]
|
#[derive(Serialize, Deserialize, Deserr, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||||
#[deserr(try_from(&String) = FromStr::from_str -> IndexUidPatternFormatError)]
|
#[deserr(try_from(&String) = FromStr::from_str -> IndexUidPatternFormatError)]
|
||||||
pub struct IndexUidPattern(String);
|
pub struct IndexUidPattern(String);
|
||||||
|
|
||||||
|
@ -8,7 +8,9 @@ use utoipa::ToSchema;
|
|||||||
use crate::batches::BatchId;
|
use crate::batches::BatchId;
|
||||||
use crate::error::ResponseError;
|
use crate::error::ResponseError;
|
||||||
use crate::settings::{Settings, Unchecked};
|
use crate::settings::{Settings, Unchecked};
|
||||||
use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId};
|
use crate::tasks::{
|
||||||
|
serialize_duration, Details, DetailsExportIndexSettings, IndexSwap, Kind, Status, Task, TaskId,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, ToSchema)]
|
#[derive(Debug, Clone, PartialEq, Serialize, ToSchema)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
@ -126,9 +128,7 @@ pub struct DetailsView {
|
|||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub api_key: Option<String>,
|
pub api_key: Option<String>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub exported_documents: Option<BTreeMap<String, u32>>,
|
pub indexes: Option<BTreeMap<String, DetailsExportIndexSettings>>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub skip_embeddings: Option<bool>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DetailsView {
|
impl DetailsView {
|
||||||
@ -263,19 +263,9 @@ impl DetailsView {
|
|||||||
// So we return the first one we encounter but that shouldn't be an issue anyway.
|
// So we return the first one we encounter but that shouldn't be an issue anyway.
|
||||||
(Some(left), Some(_right)) => Some(left),
|
(Some(left), Some(_right)) => Some(left),
|
||||||
},
|
},
|
||||||
exported_documents: match (
|
indexes: match (self.indexes.clone(), other.indexes.clone()) {
|
||||||
self.exported_documents.clone(),
|
|
||||||
other.exported_documents.clone(),
|
|
||||||
) {
|
|
||||||
(None, None) => None,
|
(None, None) => None,
|
||||||
(None, Some(exp)) | (Some(exp), None) => Some(exp),
|
(None, Some(indexes)) | (Some(indexes), None) => Some(indexes),
|
||||||
// We should never be able to batch multiple exports at the same time.
|
|
||||||
// So we return the first one we encounter but that shouldn't be an issue anyway.
|
|
||||||
(Some(left), Some(_right)) => Some(left),
|
|
||||||
},
|
|
||||||
skip_embeddings: match (self.skip_embeddings, other.skip_embeddings) {
|
|
||||||
(None, None) => None,
|
|
||||||
(None, Some(skip)) | (Some(skip), None) => Some(skip),
|
|
||||||
// We should never be able to batch multiple exports at the same time.
|
// We should never be able to batch multiple exports at the same time.
|
||||||
// So we return the first one we encounter but that shouldn't be an issue anyway.
|
// So we return the first one we encounter but that shouldn't be an issue anyway.
|
||||||
(Some(left), Some(_right)) => Some(left),
|
(Some(left), Some(_right)) => Some(left),
|
||||||
@ -369,9 +359,17 @@ impl From<Details> for DetailsView {
|
|||||||
Details::IndexSwap { swaps } => {
|
Details::IndexSwap { swaps } => {
|
||||||
DetailsView { swaps: Some(swaps), ..Default::default() }
|
DetailsView { swaps: Some(swaps), ..Default::default() }
|
||||||
}
|
}
|
||||||
Details::Export { url, api_key, exported_documents, skip_embeddings } => {
|
Details::Export { url, api_key, indexes } => DetailsView {
|
||||||
DetailsView { exported_documents: Some(exported_documents), ..Default::default() }
|
url: Some(url),
|
||||||
}
|
api_key,
|
||||||
|
indexes: Some(
|
||||||
|
indexes
|
||||||
|
.into_iter()
|
||||||
|
.map(|(pattern, settings)| (pattern.to_string(), settings))
|
||||||
|
.collect(),
|
||||||
|
),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
Details::UpgradeDatabase { from, to } => DetailsView {
|
Details::UpgradeDatabase { from, to } => DetailsView {
|
||||||
upgrade_from: Some(format!("v{}.{}.{}", from.0, from.1, from.2)),
|
upgrade_from: Some(format!("v{}.{}.{}", from.0, from.1, from.2)),
|
||||||
upgrade_to: Some(format!("v{}.{}.{}", to.0, to.1, to.2)),
|
upgrade_to: Some(format!("v{}.{}.{}", to.0, to.1, to.2)),
|
||||||
|
@ -9,7 +9,7 @@ use milli::Object;
|
|||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use serde::{Deserialize, Serialize, Serializer};
|
use serde::{Deserialize, Serialize, Serializer};
|
||||||
use time::{Duration, OffsetDateTime};
|
use time::{Duration, OffsetDateTime};
|
||||||
use utoipa::ToSchema;
|
use utoipa::{schema, ToSchema};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::batches::BatchId;
|
use crate::batches::BatchId;
|
||||||
@ -158,8 +158,7 @@ pub enum KindWithContent {
|
|||||||
Export {
|
Export {
|
||||||
url: String,
|
url: String,
|
||||||
api_key: Option<String>,
|
api_key: Option<String>,
|
||||||
indexes: Vec<IndexUidPattern>,
|
indexes: BTreeMap<IndexUidPattern, ExportIndexSettings>,
|
||||||
skip_embeddings: bool,
|
|
||||||
},
|
},
|
||||||
UpgradeDatabase {
|
UpgradeDatabase {
|
||||||
from: (u32, u32, u32),
|
from: (u32, u32, u32),
|
||||||
@ -172,6 +171,13 @@ pub struct IndexSwap {
|
|||||||
pub indexes: (String, String),
|
pub indexes: (String, String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ExportIndexSettings {
|
||||||
|
pub skip_embeddings: bool,
|
||||||
|
pub filter: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
impl KindWithContent {
|
impl KindWithContent {
|
||||||
pub fn as_kind(&self) -> Kind {
|
pub fn as_kind(&self) -> Kind {
|
||||||
match self {
|
match self {
|
||||||
@ -280,14 +286,11 @@ impl KindWithContent {
|
|||||||
}),
|
}),
|
||||||
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
||||||
KindWithContent::SnapshotCreation => None,
|
KindWithContent::SnapshotCreation => None,
|
||||||
KindWithContent::Export { url, api_key, indexes: _, skip_embeddings } => {
|
KindWithContent::Export { url, api_key, indexes } => Some(Details::Export {
|
||||||
Some(Details::Export {
|
url: url.clone(),
|
||||||
url: url.clone(),
|
api_key: api_key.clone(),
|
||||||
api_key: api_key.clone(),
|
indexes: indexes.into_iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(),
|
||||||
exported_documents: Default::default(),
|
}),
|
||||||
skip_embeddings: *skip_embeddings,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
|
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
|
||||||
from: (from.0, from.1, from.2),
|
from: (from.0, from.1, from.2),
|
||||||
to: (
|
to: (
|
||||||
@ -354,14 +357,11 @@ impl KindWithContent {
|
|||||||
}),
|
}),
|
||||||
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
||||||
KindWithContent::SnapshotCreation => None,
|
KindWithContent::SnapshotCreation => None,
|
||||||
KindWithContent::Export { url, api_key, indexes: _, skip_embeddings } => {
|
KindWithContent::Export { url, api_key, indexes } => Some(Details::Export {
|
||||||
Some(Details::Export {
|
url: url.clone(),
|
||||||
url: url.clone(),
|
api_key: api_key.clone(),
|
||||||
api_key: api_key.clone(),
|
indexes: indexes.into_iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(),
|
||||||
exported_documents: Default::default(),
|
}),
|
||||||
skip_embeddings: skip_embeddings.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
|
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
|
||||||
from: *from,
|
from: *from,
|
||||||
to: (
|
to: (
|
||||||
@ -410,14 +410,11 @@ impl From<&KindWithContent> for Option<Details> {
|
|||||||
}),
|
}),
|
||||||
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }),
|
||||||
KindWithContent::SnapshotCreation => None,
|
KindWithContent::SnapshotCreation => None,
|
||||||
KindWithContent::Export { url, api_key, indexes: _, skip_embeddings } => {
|
KindWithContent::Export { url, api_key, indexes } => Some(Details::Export {
|
||||||
Some(Details::Export {
|
url: url.clone(),
|
||||||
url: url.clone(),
|
api_key: api_key.clone(),
|
||||||
api_key: api_key.clone(),
|
indexes: indexes.into_iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(),
|
||||||
exported_documents: BTreeMap::default(),
|
}),
|
||||||
skip_embeddings: skip_embeddings.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
|
KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
|
||||||
from: *from,
|
from: *from,
|
||||||
to: (
|
to: (
|
||||||
@ -684,8 +681,7 @@ pub enum Details {
|
|||||||
Export {
|
Export {
|
||||||
url: String,
|
url: String,
|
||||||
api_key: Option<String>,
|
api_key: Option<String>,
|
||||||
exported_documents: BTreeMap<String, u32>,
|
indexes: BTreeMap<IndexUidPattern, DetailsExportIndexSettings>,
|
||||||
skip_embeddings: bool,
|
|
||||||
},
|
},
|
||||||
UpgradeDatabase {
|
UpgradeDatabase {
|
||||||
from: (u32, u32, u32),
|
from: (u32, u32, u32),
|
||||||
@ -693,6 +689,23 @@ pub enum Details {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
|
||||||
|
#[schema(rename_all = "camelCase")]
|
||||||
|
pub struct DetailsExportIndexSettings {
|
||||||
|
#[serde(flatten)]
|
||||||
|
settings: ExportIndexSettings,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
matched_documents: Option<u64>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
exported_documents: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ExportIndexSettings> for DetailsExportIndexSettings {
|
||||||
|
fn from(settings: ExportIndexSettings) -> Self {
|
||||||
|
DetailsExportIndexSettings { settings, matched_documents: None, exported_documents: None }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Details {
|
impl Details {
|
||||||
pub fn to_failed(&self) -> Self {
|
pub fn to_failed(&self) -> Self {
|
||||||
let mut details = self.clone();
|
let mut details = self.clone();
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use actix_web::web::{self, Data};
|
use actix_web::web::{self, Data};
|
||||||
use actix_web::{HttpRequest, HttpResponse};
|
use actix_web::{HttpRequest, HttpResponse};
|
||||||
use deserr::actix_web::AwebJson;
|
use deserr::actix_web::AwebJson;
|
||||||
@ -8,7 +10,7 @@ use meilisearch_types::error::deserr_codes::*;
|
|||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::ResponseError;
|
||||||
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||||
use meilisearch_types::keys::actions;
|
use meilisearch_types::keys::actions;
|
||||||
use meilisearch_types::tasks::KindWithContent;
|
use meilisearch_types::tasks::{ExportIndexSettings as DbExportIndexSettings, KindWithContent};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
use utoipa::{OpenApi, ToSchema};
|
use utoipa::{OpenApi, ToSchema};
|
||||||
@ -69,8 +71,17 @@ async fn export(
|
|||||||
let export = export.into_inner();
|
let export = export.into_inner();
|
||||||
debug!(returns = ?export, "Trigger export");
|
debug!(returns = ?export, "Trigger export");
|
||||||
|
|
||||||
let Export { url, api_key, indexes, skip_embeddings } = export;
|
let Export { url, api_key, indexes } = export;
|
||||||
let task = KindWithContent::Export { url, api_key, indexes, skip_embeddings };
|
let task = KindWithContent::Export {
|
||||||
|
url,
|
||||||
|
api_key,
|
||||||
|
indexes: indexes
|
||||||
|
.into_iter()
|
||||||
|
.map(|(pattern, ExportIndexSettings { skip_embeddings, filter })| {
|
||||||
|
(pattern, DbExportIndexSettings { skip_embeddings, filter })
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
};
|
||||||
let uid = get_task_id(&req, &opt)?;
|
let uid = get_task_id(&req, &opt)?;
|
||||||
let dry_run = is_dry_run(&req, &opt)?;
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
@ -95,11 +106,22 @@ pub struct Export {
|
|||||||
#[deserr(default, error = DeserrJsonError<InvalidExportApiKey>)]
|
#[deserr(default, error = DeserrJsonError<InvalidExportApiKey>)]
|
||||||
pub api_key: Option<String>,
|
pub api_key: Option<String>,
|
||||||
#[schema(value_type = Option<BTreeSet<String>>, example = json!(["movies", "steam-*"]))]
|
#[schema(value_type = Option<BTreeSet<String>>, example = json!(["movies", "steam-*"]))]
|
||||||
#[deserr(default, error = DeserrJsonError<InvalidExportIndexesPatterns>)]
|
#[deserr(default)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub indexes: Vec<IndexUidPattern>,
|
pub indexes: BTreeMap<IndexUidPattern, ExportIndexSettings>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserr, ToSchema, Serialize)]
|
||||||
|
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
#[schema(rename_all = "camelCase")]
|
||||||
|
pub struct ExportIndexSettings {
|
||||||
#[schema(value_type = Option<bool>, example = json!("true"))]
|
#[schema(value_type = Option<bool>, example = json!("true"))]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
#[deserr(default, error = DeserrJsonError<InvalidExportSkipEmbeddings>)]
|
#[deserr(default, error = DeserrJsonError<InvalidExportIndexSkipEmbeddings>)]
|
||||||
pub skip_embeddings: bool,
|
pub skip_embeddings: bool,
|
||||||
|
#[schema(value_type = Option<String>, example = json!("genres = action"))]
|
||||||
|
#[serde(default)]
|
||||||
|
#[deserr(default, error = DeserrJsonError<InvalidExportIndexFilter>)]
|
||||||
|
pub filter: Option<String>,
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user