mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 11:57:07 +02:00
Working first implementation
This commit is contained in:
parent
e74c3b692a
commit
e023ee4b6b
15 changed files with 298 additions and 103 deletions
|
@ -212,19 +212,20 @@ impl<'a> Dump<'a> {
|
|||
KindWithContent::DumpCreation { keys, instance_uid }
|
||||
}
|
||||
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
|
||||
KindDump::Export { url, indexes, skip_embeddings, api_key } => {
|
||||
KindWithContent::Export {
|
||||
url,
|
||||
indexes: indexes
|
||||
.into_iter()
|
||||
.map(|index| {
|
||||
IndexUidPattern::try_from(index).map_err(|_| Error::CorruptedDump)
|
||||
})
|
||||
.collect::<Result<Vec<_>, Error>>()?,
|
||||
skip_embeddings,
|
||||
api_key,
|
||||
}
|
||||
}
|
||||
KindDump::Export { url, indexes, api_key } => KindWithContent::Export {
|
||||
url,
|
||||
api_key,
|
||||
indexes: indexes
|
||||
.into_iter()
|
||||
.map(|(pattern, settings)| {
|
||||
Ok((
|
||||
IndexUidPattern::try_from(pattern)
|
||||
.map_err(|_| Error::CorruptedDump)?,
|
||||
settings,
|
||||
))
|
||||
})
|
||||
.collect::<Result<_, Error>>()?,
|
||||
},
|
||||
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
|
||||
},
|
||||
};
|
||||
|
|
|
@ -151,6 +151,8 @@ pub enum Error {
|
|||
CorruptedTaskQueue,
|
||||
#[error(transparent)]
|
||||
DatabaseUpgrade(Box<Self>),
|
||||
#[error(transparent)]
|
||||
Export(Box<Self>),
|
||||
#[error("Failed to rollback for index `{index}`: {rollback_outcome} ")]
|
||||
RollbackFailed { index: String, rollback_outcome: RollbackOutcome },
|
||||
#[error(transparent)]
|
||||
|
@ -221,6 +223,7 @@ impl Error {
|
|||
| Error::IoError(_)
|
||||
| Error::Persist(_)
|
||||
| Error::FeatureNotEnabled(_)
|
||||
| Error::Export(_)
|
||||
| Error::Anyhow(_) => true,
|
||||
Error::CreateBatch(_)
|
||||
| Error::CorruptedTaskQueue
|
||||
|
@ -294,6 +297,7 @@ impl ErrorCode for Error {
|
|||
Error::CorruptedTaskQueue => Code::Internal,
|
||||
Error::CorruptedDump => Code::Internal,
|
||||
Error::DatabaseUpgrade(_) => Code::Internal,
|
||||
Error::Export(_) => Code::Internal,
|
||||
Error::RollbackFailed { .. } => Code::Internal,
|
||||
Error::UnrecoverableError(_) => Code::Internal,
|
||||
Error::IndexSchedulerVersionMismatch { .. } => Code::Internal,
|
||||
|
|
|
@ -289,8 +289,8 @@ fn snapshot_details(d: &Details) -> String {
|
|||
Details::IndexSwap { swaps } => {
|
||||
format!("{{ swaps: {swaps:?} }}")
|
||||
}
|
||||
Details::Export { url, api_key, exported_documents, skip_embeddings } => {
|
||||
format!("{{ url: {url:?}, api_key: {api_key:?}, exported_documents: {exported_documents:?}, skip_embeddings: {skip_embeddings:?} }}")
|
||||
Details::Export { url, api_key, indexes } => {
|
||||
format!("{{ url: {url:?}, api_key: {api_key:?}, indexes: {indexes:?} }}")
|
||||
}
|
||||
Details::UpgradeDatabase { from, to } => {
|
||||
format!("{{ from: {from:?}, to: {to:?} }}")
|
||||
|
|
|
@ -4,6 +4,7 @@ mod autobatcher_test;
|
|||
mod create_batch;
|
||||
mod process_batch;
|
||||
mod process_dump_creation;
|
||||
mod process_export;
|
||||
mod process_index_operation;
|
||||
mod process_snapshot_creation;
|
||||
mod process_upgrade;
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::panic::{catch_unwind, AssertUnwindSafe};
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||
|
@ -14,9 +13,9 @@ use roaring::RoaringBitmap;
|
|||
|
||||
use super::create_batch::Batch;
|
||||
use crate::processing::{
|
||||
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, Export,
|
||||
FinalizingIndexStep, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress,
|
||||
TaskDeletionProgress, UpdateIndexProgress,
|
||||
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep,
|
||||
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
|
||||
UpdateIndexProgress,
|
||||
};
|
||||
use crate::utils::{
|
||||
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
||||
|
@ -363,18 +362,32 @@ impl IndexScheduler {
|
|||
Ok((vec![task], ProcessBatchInfo::default()))
|
||||
}
|
||||
Batch::Export { mut task } => {
|
||||
progress.update_progress(Export::EnsuringCorrectnessOfTheTarget);
|
||||
|
||||
// TODO send check requests with the API Key
|
||||
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let KindWithContent::Export { url, indexes, skip_embeddings, api_key } = &task.kind
|
||||
else {
|
||||
let KindWithContent::Export { url, indexes, api_key } = &task.kind else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
eprintln!("Exporting data to {}...", url);
|
||||
std::thread::sleep(Duration::from_secs(30));
|
||||
let ret = catch_unwind(AssertUnwindSafe(|| {
|
||||
self.process_export(url, indexes, api_key.as_deref(), progress)
|
||||
}));
|
||||
|
||||
match ret {
|
||||
// TODO return the matched and exported documents
|
||||
Ok(Ok(())) => (),
|
||||
Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask),
|
||||
Ok(Err(e)) => return Err(Error::Export(Box::new(e))),
|
||||
Err(e) => {
|
||||
let msg = match e.downcast_ref::<&'static str>() {
|
||||
Some(s) => *s,
|
||||
None => match e.downcast_ref::<String>() {
|
||||
Some(s) => &s[..],
|
||||
None => "Box<dyn Any>",
|
||||
},
|
||||
};
|
||||
return Err(Error::Export(Box::new(Error::ProcessBatchPanicked(
|
||||
msg.to_string(),
|
||||
))));
|
||||
}
|
||||
}
|
||||
|
||||
task.status = Status::Succeeded;
|
||||
Ok((vec![task], ProcessBatchInfo::default()))
|
||||
|
@ -726,9 +739,11 @@ impl IndexScheduler {
|
|||
from.1,
|
||||
from.2
|
||||
);
|
||||
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
let ret = catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
self.process_rollback(from, progress)
|
||||
})) {
|
||||
}));
|
||||
|
||||
match ret {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(err)) => return Err(Error::DatabaseUpgrade(Box::new(err))),
|
||||
Err(e) => {
|
||||
|
|
141
crates/index-scheduler/src/scheduler/process_export.rs
Normal file
141
crates/index-scheduler/src/scheduler/process_export.rs
Normal file
|
@ -0,0 +1,141 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::milli::{obkv_to_json, Filter};
|
||||
use meilisearch_types::settings::{self, SecretPolicy};
|
||||
use meilisearch_types::tasks::ExportIndexSettings;
|
||||
use ureq::{json, Agent};
|
||||
|
||||
use crate::{Error, IndexScheduler, Result};
|
||||
|
||||
impl IndexScheduler {
|
||||
pub(super) fn process_export(
|
||||
&self,
|
||||
url: &str,
|
||||
indexes: &BTreeMap<IndexUidPattern, ExportIndexSettings>,
|
||||
api_key: Option<&str>,
|
||||
progress: Progress,
|
||||
) -> Result<()> {
|
||||
#[cfg(test)]
|
||||
self.maybe_fail(crate::test_utils::FailureLocation::ProcessExport)?;
|
||||
|
||||
let indexes: Vec<_> = self
|
||||
.index_names()?
|
||||
.into_iter()
|
||||
.flat_map(|uid| {
|
||||
indexes
|
||||
.iter()
|
||||
.find(|(pattern, _)| pattern.matches_str(&uid))
|
||||
.map(|(_pattern, settings)| (uid, settings))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let agent: Agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build();
|
||||
|
||||
for (i, (uid, settings)) in indexes.iter().enumerate() {
|
||||
let must_stop_processing = self.scheduler.must_stop_processing.clone();
|
||||
if must_stop_processing.get() {
|
||||
return Err(Error::AbortedTask);
|
||||
}
|
||||
|
||||
progress.update_progress(VariableNameStep::<ExportIndex>::new(
|
||||
format!("Exporting index `{uid}`"),
|
||||
i as u32,
|
||||
indexes.len() as u32,
|
||||
));
|
||||
|
||||
let ExportIndexSettings { skip_embeddings, filter } = settings;
|
||||
let index = self.index(uid)?;
|
||||
let index_rtxn = index.read_txn()?;
|
||||
|
||||
// Send the primary key
|
||||
let primary_key = index.primary_key(&index_rtxn).unwrap();
|
||||
// TODO implement retry logic
|
||||
let mut request = agent.post(&format!("{url}/indexes"));
|
||||
if let Some(api_key) = api_key {
|
||||
request = request.set("Authorization", &format!("Bearer {api_key}"));
|
||||
}
|
||||
request.send_json(&json!({ "uid": uid, "primaryKey": primary_key })).unwrap();
|
||||
|
||||
// Send the index settings
|
||||
let settings = settings::settings(&index, &index_rtxn, SecretPolicy::RevealSecrets)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
// TODO implement retry logic
|
||||
// improve error reporting (get error message)
|
||||
let mut request = agent.patch(&format!("{url}/indexes/{uid}/settings"));
|
||||
if let Some(api_key) = api_key {
|
||||
request = request.set("Authorization", &format!("Bearer {api_key}"));
|
||||
}
|
||||
request.send_json(settings).unwrap();
|
||||
|
||||
let filter = filter
|
||||
.as_deref()
|
||||
.map(Filter::from_str)
|
||||
.transpose()
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?
|
||||
.flatten();
|
||||
|
||||
let filter_universe = filter
|
||||
.map(|f| f.evaluate(&index_rtxn, &index))
|
||||
.transpose()
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
let whole_universe = index
|
||||
.documents_ids(&index_rtxn)
|
||||
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
|
||||
let universe = filter_universe.unwrap_or(whole_universe);
|
||||
|
||||
let fields_ids_map = index.fields_ids_map(&index_rtxn)?;
|
||||
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||
let embedding_configs = index
|
||||
.embedding_configs(&index_rtxn)
|
||||
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
|
||||
|
||||
let limit = 50 * 1024 * 1024; // 50 MiB
|
||||
let mut buffer = Vec::new();
|
||||
let mut tmp_buffer = Vec::new();
|
||||
for docid in universe {
|
||||
let document = index
|
||||
.document(&index_rtxn, docid)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
|
||||
let value = obkv_to_json(&all_fields, &fields_ids_map, document)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
|
||||
tmp_buffer.clear();
|
||||
serde_json::to_writer(&mut tmp_buffer, &value)
|
||||
.map_err(meilisearch_types::milli::InternalError::from)
|
||||
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
|
||||
|
||||
if buffer.len() + tmp_buffer.len() > limit {
|
||||
// TODO implement retry logic
|
||||
post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap();
|
||||
buffer.clear();
|
||||
}
|
||||
buffer.extend_from_slice(&tmp_buffer);
|
||||
}
|
||||
|
||||
post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn post_serialized_documents(
|
||||
agent: &Agent,
|
||||
url: &str,
|
||||
uid: &str,
|
||||
api_key: Option<&str>,
|
||||
buffer: &[u8],
|
||||
) -> Result<ureq::Response, ureq::Error> {
|
||||
let mut request = agent.post(&format!("{url}/indexes/{uid}/documents"));
|
||||
request = request.set("Content-Type", "application/x-ndjson");
|
||||
if let Some(api_key) = api_key {
|
||||
request = request.set("Authorization", &format!("Bearer {api_key}"));
|
||||
}
|
||||
request.send_bytes(buffer)
|
||||
}
|
||||
|
||||
enum ExportIndex {}
|
|
@ -37,6 +37,7 @@ pub(crate) enum FailureLocation {
|
|||
InsideCreateBatch,
|
||||
InsideProcessBatch,
|
||||
PanicInsideProcessBatch,
|
||||
ProcessExport,
|
||||
ProcessUpgrade,
|
||||
AcquiringWtxn,
|
||||
UpdatingTaskAfterProcessBatchSuccess { task_uid: u32 },
|
||||
|
|
|
@ -601,12 +601,7 @@ impl crate::IndexScheduler {
|
|||
Details::Dump { dump_uid: _ } => {
|
||||
assert_eq!(kind.as_kind(), Kind::DumpCreation);
|
||||
}
|
||||
Details::Export {
|
||||
url: _,
|
||||
api_key: _,
|
||||
exported_documents: _,
|
||||
skip_embeddings: _,
|
||||
} => {
|
||||
Details::Export { url: _, api_key: _, indexes: _ } => {
|
||||
assert_eq!(kind.as_kind(), Kind::Export);
|
||||
}
|
||||
Details::UpgradeDatabase { from: _, to: _ } => {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue