Introduce a new route to export documents and enqueue the export task

This commit is contained in:
Clément Renault 2025-06-12 16:23:48 +02:00 committed by Kerollmops
parent ae8c1461e1
commit e74c3b692a
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
14 changed files with 303 additions and 10 deletions

View file

@ -4,6 +4,7 @@ use std::io;
use dump::{KindDump, TaskDump, UpdateFile};
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::heed::RwTxn;
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::milli;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
@ -211,6 +212,19 @@ impl<'a> Dump<'a> {
KindWithContent::DumpCreation { keys, instance_uid }
}
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
KindDump::Export { url, indexes, skip_embeddings, api_key } => {
KindWithContent::Export {
url,
indexes: indexes
.into_iter()
.map(|index| {
IndexUidPattern::try_from(index).map_err(|_| Error::CorruptedDump)
})
.collect::<Result<Vec<_>, Error>>()?,
skip_embeddings,
api_key,
}
}
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
},
};

View file

@ -289,6 +289,9 @@ fn snapshot_details(d: &Details) -> String {
Details::IndexSwap { swaps } => {
format!("{{ swaps: {swaps:?} }}")
}
Details::Export { url, api_key, exported_documents, skip_embeddings } => {
format!("{{ url: {url:?}, api_key: {api_key:?}, exported_documents: {exported_documents:?}, skip_embeddings: {skip_embeddings:?} }}")
}
Details::UpgradeDatabase { from, to } => {
format!("{{ from: {from:?}, to: {to:?} }}")
}

View file

@ -175,8 +175,17 @@ make_enum_progress! {
}
}
make_enum_progress! {
pub enum Export {
EnsuringCorrectnessOfTheTarget,
ExportTheSettings,
ExportTheDocuments,
}
}
make_atomic_progress!(Task alias AtomicTaskStep => "task" );
make_atomic_progress!(Document alias AtomicDocumentStep => "document" );
make_atomic_progress!(Index alias AtomicIndexStep => "index" );
make_atomic_progress!(Batch alias AtomicBatchStep => "batch" );
make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" );

View file

@ -71,6 +71,7 @@ impl From<KindWithContent> for AutobatchKind {
KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. }
| KindWithContent::Export { .. }
| KindWithContent::UpgradeDatabase { .. }
| KindWithContent::SnapshotCreation => {
panic!("The autobatcher should never be called with tasks that don't apply to an index.")

View file

@ -47,6 +47,9 @@ pub(crate) enum Batch {
IndexSwap {
task: Task,
},
Export {
task: Task,
},
UpgradeDatabase {
tasks: Vec<Task>,
},
@ -103,6 +106,7 @@ impl Batch {
Batch::TaskCancelation { task, .. }
| Batch::Dump(task)
| Batch::IndexCreation { task, .. }
| Batch::Export { task }
| Batch::IndexUpdate { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
@ -142,6 +146,7 @@ impl Batch {
| TaskDeletions(_)
| SnapshotCreation(_)
| Dump(_)
| Export { .. }
| UpgradeDatabase { .. }
| IndexSwap { .. } => None,
IndexOperation { op, .. } => Some(op.index_uid()),
@ -167,6 +172,7 @@ impl fmt::Display for Batch {
Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?,
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
Batch::Export { .. } => f.write_str("Export")?,
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
};
match index_uid {
@ -426,9 +432,10 @@ impl IndexScheduler {
/// 0. We get the *last* task to cancel.
/// 1. We get the tasks to upgrade.
/// 2. We get the *next* task to delete.
/// 3. We get the *next* snapshot to process.
/// 4. We get the *next* dump to process.
/// 5. We get the *next* tasks to process for a specific index.
/// 3. We get the *next* export to process.
/// 4. We get the *next* snapshot to process.
/// 5. We get the *next* dump to process.
/// 6. We get the *next* tasks to process for a specific index.
#[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")]
pub(crate) fn create_next_batch(
&self,
@ -500,7 +507,17 @@ impl IndexScheduler {
return Ok(Some((Batch::TaskDeletions(tasks), current_batch)));
}
// 3. we batch the snapshot.
// 3. we batch the export.
let to_export = self.queue.tasks.get_kind(rtxn, Kind::Export)? & enqueued;
if !to_export.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_export)?;
current_batch.processing(&mut tasks);
let task = tasks.pop().expect("There must be only one export task");
current_batch.reason(BatchStopReason::TaskKindCannotBeBatched { kind: Kind::Export });
return Ok(Some((Batch::Export { task }, current_batch)));
}
// 4. we batch the snapshot.
let to_snapshot = self.queue.tasks.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued;
if !to_snapshot.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?;
@ -510,7 +527,7 @@ impl IndexScheduler {
return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
}
// 4. we batch the dumps.
// 5. we batch the dumps.
let to_dump = self.queue.tasks.get_kind(rtxn, Kind::DumpCreation)? & enqueued;
if let Some(to_dump) = to_dump.min() {
let mut task =
@ -523,7 +540,7 @@ impl IndexScheduler {
return Ok(Some((Batch::Dump(task), current_batch)));
}
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
// 6. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
let mut task =
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;

View file

@ -1,6 +1,7 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering;
use std::time::Duration;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{RoTxn, RwTxn};
@ -13,9 +14,9 @@ use roaring::RoaringBitmap;
use super::create_batch::Batch;
use crate::processing::{
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep,
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress,
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, Export,
FinalizingIndexStep, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress,
TaskDeletionProgress, UpdateIndexProgress,
};
use crate::utils::{
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
@ -361,6 +362,23 @@ impl IndexScheduler {
task.status = Status::Succeeded;
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::Export { mut task } => {
progress.update_progress(Export::EnsuringCorrectnessOfTheTarget);
// TODO send check requests with the API Key
let mut wtxn = self.env.write_txn()?;
let KindWithContent::Export { url, indexes, skip_embeddings, api_key } = &task.kind
else {
unreachable!()
};
eprintln!("Exporting data to {}...", url);
std::thread::sleep(Duration::from_secs(30));
task.status = Status::Succeeded;
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::UpgradeDatabase { mut tasks } => {
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
unreachable!();

View file

@ -273,6 +273,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
K::TaskCancelation { .. }
| K::TaskDeletion { .. }
| K::DumpCreation { .. }
| K::Export { .. } // TODO I have patterns, not index uids
| K::UpgradeDatabase { .. }
| K::SnapshotCreation => (),
};
@ -600,6 +601,14 @@ impl crate::IndexScheduler {
Details::Dump { dump_uid: _ } => {
assert_eq!(kind.as_kind(), Kind::DumpCreation);
}
Details::Export {
url: _,
api_key: _,
exported_documents: _,
skip_embeddings: _,
} => {
assert_eq!(kind.as_kind(), Kind::Export);
}
Details::UpgradeDatabase { from: _, to: _ } => {
assert_eq!(kind.as_kind(), Kind::UpgradeDatabase);
}