diff --git a/Cargo.lock b/Cargo.lock index 7455ff1b4..be6aa4b21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2997,6 +2997,7 @@ name = "index-scheduler" version = "1.15.2" dependencies = [ "anyhow", + "backoff", "big_s", "bincode", "bumpalo", @@ -3854,6 +3855,7 @@ dependencies = [ "anyhow", "bumpalo", "bumparaw-collections", + "byte-unit", "convert_case 0.8.0", "csv", "deserr", diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index a84ec4ba5..81ba40944 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -1,12 +1,17 @@ #![allow(clippy::type_complexity)] #![allow(clippy::wrong_self_convention)] +use std::collections::BTreeMap; + use meilisearch_types::batches::BatchId; +use meilisearch_types::byte_unit::Byte; use meilisearch_types::error::ResponseError; use meilisearch_types::keys::Key; use meilisearch_types::milli::update::IndexDocumentsMethod; use meilisearch_types::settings::Unchecked; -use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task, TaskId}; +use meilisearch_types::tasks::{ + Details, ExportIndexSettings, IndexSwap, KindWithContent, Status, Task, TaskId, +}; use meilisearch_types::InstanceUid; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; @@ -141,6 +146,12 @@ pub enum KindDump { instance_uid: Option, }, SnapshotCreation, + Export { + url: String, + api_key: Option, + payload_size: Option, + indexes: BTreeMap, + }, UpgradeDatabase { from: (u32, u32, u32), }, @@ -213,6 +224,15 @@ impl From for KindDump { KindDump::DumpCreation { keys, instance_uid } } KindWithContent::SnapshotCreation => KindDump::SnapshotCreation, + KindWithContent::Export { url, api_key, payload_size, indexes } => KindDump::Export { + url, + api_key, + payload_size, + indexes: indexes + .into_iter() + .map(|(pattern, settings)| (pattern.to_string(), settings)) + .collect(), + }, KindWithContent::UpgradeDatabase { from: version } => { KindDump::UpgradeDatabase { from: version } } diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index f4901b2f2..de0d01935 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -44,6 +44,7 @@ time = { version = "0.3.41", features = [ tracing = "0.1.41" ureq = "2.12.1" uuid = { version = "1.17.0", features = ["serde", "v4"] } +backoff = "0.4.0" [dev-dependencies] big_s = "1.0.2" diff --git a/crates/index-scheduler/src/dump.rs b/crates/index-scheduler/src/dump.rs index ca26e50c8..1e681c8e8 100644 --- a/crates/index-scheduler/src/dump.rs +++ b/crates/index-scheduler/src/dump.rs @@ -4,6 +4,7 @@ use std::io; use dump::{KindDump, TaskDump, UpdateFile}; use meilisearch_types::batches::{Batch, BatchId}; use meilisearch_types::heed::RwTxn; +use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::milli; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use roaring::RoaringBitmap; @@ -211,6 +212,23 @@ impl<'a> Dump<'a> { KindWithContent::DumpCreation { keys, instance_uid } } KindDump::SnapshotCreation => KindWithContent::SnapshotCreation, + KindDump::Export { url, api_key, payload_size, indexes } => { + KindWithContent::Export { + url, + api_key, + payload_size, + indexes: indexes + .into_iter() + .map(|(pattern, settings)| { + Ok(( + IndexUidPattern::try_from(pattern) + .map_err(|_| Error::CorruptedDump)?, + settings, + )) + }) + .collect::>()?, + } + } KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from }, }, }; diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index cb798b385..60669ff2d 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -151,6 +151,10 @@ pub enum Error { CorruptedTaskQueue, #[error(transparent)] DatabaseUpgrade(Box), + #[error(transparent)] + Export(Box), + #[error("Failed to export documents to remote server {code} ({type}): {message} <{link}>")] + FromRemoteWhenExporting { message: String, code: String, r#type: String, link: String }, #[error("Failed to rollback for index `{index}`: {rollback_outcome} ")] RollbackFailed { index: String, rollback_outcome: RollbackOutcome }, #[error(transparent)] @@ -212,6 +216,7 @@ impl Error { | Error::BatchNotFound(_) | Error::TaskDeletionWithEmptyQuery | Error::TaskCancelationWithEmptyQuery + | Error::FromRemoteWhenExporting { .. } | Error::AbortedTask | Error::Dump(_) | Error::Heed(_) @@ -221,6 +226,7 @@ impl Error { | Error::IoError(_) | Error::Persist(_) | Error::FeatureNotEnabled(_) + | Error::Export(_) | Error::Anyhow(_) => true, Error::CreateBatch(_) | Error::CorruptedTaskQueue @@ -282,6 +288,7 @@ impl ErrorCode for Error { Error::Dump(e) => e.error_code(), Error::Milli { error, .. } => error.error_code(), Error::ProcessBatchPanicked(_) => Code::Internal, + Error::FromRemoteWhenExporting { .. } => Code::Internal, Error::Heed(e) => e.error_code(), Error::HeedTransaction(e) => e.error_code(), Error::FileStore(e) => e.error_code(), @@ -294,6 +301,7 @@ impl ErrorCode for Error { Error::CorruptedTaskQueue => Code::Internal, Error::CorruptedDump => Code::Internal, Error::DatabaseUpgrade(_) => Code::Internal, + Error::Export(_) => Code::Internal, Error::RollbackFailed { .. } => Code::Internal, Error::UnrecoverableError(_) => Code::Internal, Error::IndexSchedulerVersionMismatch { .. } => Code::Internal, diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index a5bb1ea56..0cbbb2514 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -289,6 +289,9 @@ fn snapshot_details(d: &Details) -> String { Details::IndexSwap { swaps } => { format!("{{ swaps: {swaps:?} }}") } + Details::Export { url, api_key, payload_size, indexes } => { + format!("{{ url: {url:?}, api_key: {api_key:?}, payload_size: {payload_size:?}, indexes: {indexes:?} }}") + } Details::UpgradeDatabase { from, to } => { format!("{{ from: {from:?}, to: {to:?} }}") } diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index f23b811e5..2aa7cf859 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -175,8 +175,17 @@ make_enum_progress! { } } +make_enum_progress! { + pub enum Export { + EnsuringCorrectnessOfTheTarget, + ExportingTheSettings, + ExportingTheDocuments, + } +} + make_atomic_progress!(Task alias AtomicTaskStep => "task" ); make_atomic_progress!(Document alias AtomicDocumentStep => "document" ); +make_atomic_progress!(Index alias AtomicIndexStep => "index" ); make_atomic_progress!(Batch alias AtomicBatchStep => "batch" ); make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" ); diff --git a/crates/index-scheduler/src/scheduler/autobatcher.rs b/crates/index-scheduler/src/scheduler/autobatcher.rs index b57983291..b3f7d2743 100644 --- a/crates/index-scheduler/src/scheduler/autobatcher.rs +++ b/crates/index-scheduler/src/scheduler/autobatcher.rs @@ -71,6 +71,7 @@ impl From for AutobatchKind { KindWithContent::TaskCancelation { .. } | KindWithContent::TaskDeletion { .. } | KindWithContent::DumpCreation { .. } + | KindWithContent::Export { .. } | KindWithContent::UpgradeDatabase { .. } | KindWithContent::SnapshotCreation => { panic!("The autobatcher should never be called with tasks that don't apply to an index.") diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index a5bc1ec6f..e78ed2c2e 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -48,6 +48,9 @@ pub(crate) enum Batch { IndexSwap { task: Task, }, + Export { + task: Task, + }, UpgradeDatabase { tasks: Vec, }, @@ -104,6 +107,7 @@ impl Batch { Batch::TaskCancelation { task, .. } | Batch::Dump(task) | Batch::IndexCreation { task, .. } + | Batch::Export { task } | Batch::IndexUpdate { task, .. } => { RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() } @@ -143,6 +147,7 @@ impl Batch { | TaskDeletions(_) | SnapshotCreation(_) | Dump(_) + | Export { .. } | UpgradeDatabase { .. } | IndexSwap { .. } => None, IndexOperation { op, .. } => Some(op.index_uid()), @@ -168,6 +173,7 @@ impl fmt::Display for Batch { Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?, Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?, Batch::IndexSwap { .. } => f.write_str("IndexSwap")?, + Batch::Export { .. } => f.write_str("Export")?, Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?, }; match index_uid { @@ -427,9 +433,10 @@ impl IndexScheduler { /// 0. We get the *last* task to cancel. /// 1. We get the tasks to upgrade. /// 2. We get the *next* task to delete. - /// 3. We get the *next* snapshot to process. - /// 4. We get the *next* dump to process. - /// 5. We get the *next* tasks to process for a specific index. + /// 3. We get the *next* export to process. + /// 4. We get the *next* snapshot to process. + /// 5. We get the *next* dump to process. + /// 6. We get the *next* tasks to process for a specific index. #[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")] pub(crate) fn create_next_batch( &self, @@ -501,7 +508,17 @@ impl IndexScheduler { return Ok(Some((Batch::TaskDeletions(tasks), current_batch))); } - // 3. we batch the snapshot. + // 3. we batch the export. + let to_export = self.queue.tasks.get_kind(rtxn, Kind::Export)? & enqueued; + if !to_export.is_empty() { + let task_id = to_export.iter().next().expect("There must be at least one export task"); + let mut task = self.queue.tasks.get_task(rtxn, task_id)?.unwrap(); + current_batch.processing([&mut task]); + current_batch.reason(BatchStopReason::TaskKindCannotBeBatched { kind: Kind::Export }); + return Ok(Some((Batch::Export { task }, current_batch))); + } + + // 4. we batch the snapshot. let to_snapshot = self.queue.tasks.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued; if !to_snapshot.is_empty() { let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?; @@ -511,7 +528,7 @@ impl IndexScheduler { return Ok(Some((Batch::SnapshotCreation(tasks), current_batch))); } - // 4. we batch the dumps. + // 5. we batch the dumps. let to_dump = self.queue.tasks.get_kind(rtxn, Kind::DumpCreation)? & enqueued; if let Some(to_dump) = to_dump.min() { let mut task = @@ -524,7 +541,7 @@ impl IndexScheduler { return Ok(Some((Batch::Dump(task), current_batch))); } - // 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. + // 6. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; let mut task = self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 0e258e27b..5ac591143 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -4,6 +4,7 @@ mod autobatcher_test; mod create_batch; mod process_batch; mod process_dump_creation; +mod process_export; mod process_index_operation; mod process_snapshot_creation; mod process_upgrade; diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index e6bf6f713..c21ab27ad 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -368,6 +368,46 @@ impl IndexScheduler { task.status = Status::Succeeded; Ok((vec![task], ProcessBatchInfo::default())) } + Batch::Export { mut task } => { + let KindWithContent::Export { url, api_key, payload_size, indexes } = &task.kind + else { + unreachable!() + }; + + let ret = catch_unwind(AssertUnwindSafe(|| { + self.process_export( + url, + api_key.as_deref(), + payload_size.as_ref(), + indexes, + progress, + ) + })); + + let stats = match ret { + Ok(Ok(stats)) => stats, + Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask), + Ok(Err(e)) => return Err(Error::Export(Box::new(e))), + Err(e) => { + let msg = match e.downcast_ref::<&'static str>() { + Some(s) => *s, + None => match e.downcast_ref::() { + Some(s) => &s[..], + None => "Box", + }, + }; + return Err(Error::Export(Box::new(Error::ProcessBatchPanicked( + msg.to_string(), + )))); + } + }; + + task.status = Status::Succeeded; + if let Some(Details::Export { indexes, .. }) = task.details.as_mut() { + *indexes = stats; + } + Ok((vec![task], ProcessBatchInfo::default())) + } Batch::UpgradeDatabase { mut tasks } => { let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { unreachable!(); @@ -715,9 +755,11 @@ impl IndexScheduler { from.1, from.2 ); - match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + let ret = catch_unwind(std::panic::AssertUnwindSafe(|| { self.process_rollback(from, progress) - })) { + })); + + match ret { Ok(Ok(())) => {} Ok(Err(err)) => return Err(Error::DatabaseUpgrade(Box::new(err))), Err(e) => { diff --git a/crates/index-scheduler/src/scheduler/process_export.rs b/crates/index-scheduler/src/scheduler/process_export.rs new file mode 100644 index 000000000..30721065e --- /dev/null +++ b/crates/index-scheduler/src/scheduler/process_export.rs @@ -0,0 +1,373 @@ +use std::collections::BTreeMap; +use std::io::{self, Write as _}; +use std::sync::atomic; +use std::time::Duration; + +use backoff::ExponentialBackoff; +use byte_unit::Byte; +use flate2::write::GzEncoder; +use flate2::Compression; +use meilisearch_types::index_uid_pattern::IndexUidPattern; +use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME; +use meilisearch_types::milli::progress::{Progress, VariableNameStep}; +use meilisearch_types::milli::update::{request_threads, Setting}; +use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; +use meilisearch_types::milli::{self, obkv_to_json, Filter, InternalError}; +use meilisearch_types::settings::{self, SecretPolicy}; +use meilisearch_types::tasks::{DetailsExportIndexSettings, ExportIndexSettings}; +use serde::Deserialize; +use ureq::{json, Response}; + +use super::MustStopProcessing; +use crate::processing::AtomicDocumentStep; +use crate::{Error, IndexScheduler, Result}; + +impl IndexScheduler { + pub(super) fn process_export( + &self, + base_url: &str, + api_key: Option<&str>, + payload_size: Option<&Byte>, + indexes: &BTreeMap, + progress: Progress, + ) -> Result> { + #[cfg(test)] + self.maybe_fail(crate::test_utils::FailureLocation::ProcessExport)?; + + let indexes: Vec<_> = self + .index_names()? + .into_iter() + .flat_map(|uid| { + indexes + .iter() + .find(|(pattern, _)| pattern.matches_str(&uid)) + .map(|(pattern, settings)| (pattern, uid, settings)) + }) + .collect(); + + let mut output = BTreeMap::new(); + let agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build(); + let must_stop_processing = self.scheduler.must_stop_processing.clone(); + for (i, (_pattern, uid, export_settings)) in indexes.iter().enumerate() { + if must_stop_processing.get() { + return Err(Error::AbortedTask); + } + + progress.update_progress(VariableNameStep::::new( + format!("Exporting index `{uid}`"), + i as u32, + indexes.len() as u32, + )); + + let ExportIndexSettings { filter, override_settings } = export_settings; + let index = self.index(uid)?; + let index_rtxn = index.read_txn()?; + + // First, check if the index already exists + let url = format!("{base_url}/indexes/{uid}"); + let response = retry(&must_stop_processing, || { + let mut request = agent.get(&url); + if let Some(api_key) = api_key { + request = request.set("Authorization", &format!("Bearer {api_key}")); + } + + request.send_bytes(Default::default()).map_err(into_backoff_error) + }); + let index_exists = match response { + Ok(response) => response.status() == 200, + Err(Error::FromRemoteWhenExporting { code, .. }) if code == "index_not_found" => { + false + } + Err(e) => return Err(e), + }; + + let primary_key = index + .primary_key(&index_rtxn) + .map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?; + + // Create the index + if !index_exists { + let url = format!("{base_url}/indexes"); + retry(&must_stop_processing, || { + let mut request = agent.post(&url); + if let Some(api_key) = api_key { + request = request.set("Authorization", &format!("Bearer {api_key}")); + } + let index_param = json!({ "uid": uid, "primaryKey": primary_key }); + request.send_json(&index_param).map_err(into_backoff_error) + })?; + } + + // Patch the index primary key + if index_exists && *override_settings { + let url = format!("{base_url}/indexes/{uid}"); + retry(&must_stop_processing, || { + let mut request = agent.patch(&url); + if let Some(api_key) = api_key { + request = request.set("Authorization", &format!("Bearer {api_key}")); + } + let index_param = json!({ "primaryKey": primary_key }); + request.send_json(&index_param).map_err(into_backoff_error) + })?; + } + + // Send the index settings + if !index_exists || *override_settings { + let mut settings = + settings::settings(&index, &index_rtxn, SecretPolicy::RevealSecrets) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + // Remove the experimental chat setting if not enabled + if self.features().check_chat_completions("exporting chat settings").is_err() { + settings.chat = Setting::NotSet; + } + // Retry logic for sending settings + let url = format!("{base_url}/indexes/{uid}/settings"); + let bearer = api_key.map(|api_key| format!("Bearer {api_key}")); + retry(&must_stop_processing, || { + let mut request = agent.patch(&url); + if let Some(bearer) = bearer.as_ref() { + request = request.set("Authorization", bearer); + } + request.send_json(settings.clone()).map_err(into_backoff_error) + })?; + } + + let filter = filter + .as_ref() + .map(Filter::from_json) + .transpose() + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))? + .flatten(); + + let filter_universe = filter + .map(|f| f.evaluate(&index_rtxn, &index)) + .transpose() + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + let whole_universe = index + .documents_ids(&index_rtxn) + .map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?; + let universe = filter_universe.unwrap_or(whole_universe); + + let fields_ids_map = index.fields_ids_map(&index_rtxn)?; + let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); + let embedding_configs = index + .embedding_configs(&index_rtxn) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + + // We don't need to keep this one alive as we will + // spawn many threads to process the documents + drop(index_rtxn); + + let total_documents = universe.len() as u32; + let (step, progress_step) = AtomicDocumentStep::new(total_documents); + progress.update_progress(progress_step); + + output.insert( + IndexUidPattern::new_unchecked(uid.clone()), + DetailsExportIndexSettings { + settings: (*export_settings).clone(), + matched_documents: Some(total_documents as u64), + }, + ); + + let limit = payload_size.map(|ps| ps.as_u64() as usize).unwrap_or(50 * 1024 * 1024); // defaults to 50 MiB + let documents_url = format!("{base_url}/indexes/{uid}/documents"); + + request_threads() + .broadcast(|ctx| { + let index_rtxn = index + .read_txn() + .map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?; + + let mut buffer = Vec::new(); + let mut tmp_buffer = Vec::new(); + let mut compressed_buffer = Vec::new(); + for (i, docid) in universe.iter().enumerate() { + if i % ctx.num_threads() != ctx.index() { + continue; + } + + let document = index + .document(&index_rtxn, docid) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + + let mut document = obkv_to_json(&all_fields, &fields_ids_map, document) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + + // TODO definitely factorize this code + 'inject_vectors: { + let embeddings = index + .embeddings(&index_rtxn, docid) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + + if embeddings.is_empty() { + break 'inject_vectors; + } + + let vectors = document + .entry(RESERVED_VECTORS_FIELD_NAME) + .or_insert(serde_json::Value::Object(Default::default())); + + let serde_json::Value::Object(vectors) = vectors else { + return Err(Error::from_milli( + milli::Error::UserError( + milli::UserError::InvalidVectorsMapType { + document_id: { + if let Ok(Some(Ok(index))) = index + .external_id_of( + &index_rtxn, + std::iter::once(docid), + ) + .map(|it| it.into_iter().next()) + { + index + } else { + format!("internal docid={docid}") + } + }, + value: vectors.clone(), + }, + ), + Some(uid.to_string()), + )); + }; + + for (embedder_name, embeddings) in embeddings { + let user_provided = embedding_configs + .iter() + .find(|conf| conf.name == embedder_name) + .is_some_and(|conf| conf.user_provided.contains(docid)); + + let embeddings = ExplicitVectors { + embeddings: Some( + VectorOrArrayOfVectors::from_array_of_vectors(embeddings), + ), + regenerate: !user_provided, + }; + vectors.insert( + embedder_name, + serde_json::to_value(embeddings).unwrap(), + ); + } + } + + tmp_buffer.clear(); + serde_json::to_writer(&mut tmp_buffer, &document) + .map_err(milli::InternalError::from) + .map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?; + + // Make sure we put at least one document in the buffer even + // though we might go above the buffer limit before sending + if !buffer.is_empty() && buffer.len() + tmp_buffer.len() > limit { + // We compress the documents before sending them + let mut encoder = + GzEncoder::new(&mut compressed_buffer, Compression::default()); + encoder + .write_all(&buffer) + .map_err(|e| Error::from_milli(e.into(), Some(uid.clone())))?; + encoder + .finish() + .map_err(|e| Error::from_milli(e.into(), Some(uid.clone())))?; + + retry(&must_stop_processing, || { + let mut request = agent.post(&documents_url); + request = request.set("Content-Type", "application/x-ndjson"); + request = request.set("Content-Encoding", "gzip"); + if let Some(api_key) = api_key { + request = request + .set("Authorization", &(format!("Bearer {api_key}"))); + } + request.send_bytes(&compressed_buffer).map_err(into_backoff_error) + })?; + buffer.clear(); + compressed_buffer.clear(); + } + buffer.extend_from_slice(&tmp_buffer); + + if i % 100 == 0 { + step.fetch_add(100, atomic::Ordering::Relaxed); + } + } + + retry(&must_stop_processing, || { + let mut request = agent.post(&documents_url); + request = request.set("Content-Type", "application/x-ndjson"); + if let Some(api_key) = api_key { + request = request.set("Authorization", &(format!("Bearer {api_key}"))); + } + request.send_bytes(&buffer).map_err(into_backoff_error) + })?; + + Ok(()) + }) + .map_err(|e| { + Error::from_milli( + milli::Error::InternalError(InternalError::PanicInThreadPool(e)), + Some(uid.to_string()), + ) + })?; + + step.store(total_documents, atomic::Ordering::Relaxed); + } + + Ok(output) + } +} + +fn retry(must_stop_processing: &MustStopProcessing, send_request: F) -> Result +where + F: Fn() -> Result>, +{ + match backoff::retry(ExponentialBackoff::default(), || { + if must_stop_processing.get() { + return Err(backoff::Error::Permanent(ureq::Error::Status( + u16::MAX, + // 444: Connection Closed Without Response + Response::new(444, "Abort", "Aborted task").unwrap(), + ))); + } + send_request() + }) { + Ok(response) => Ok(response), + Err(backoff::Error::Permanent(e)) => Err(ureq_error_into_error(e)), + Err(backoff::Error::Transient { err, retry_after: _ }) => Err(ureq_error_into_error(err)), + } +} + +fn into_backoff_error(err: ureq::Error) -> backoff::Error { + match err { + // Those code status must trigger an automatic retry + // + ureq::Error::Status(408 | 429 | 500 | 502 | 503 | 504, _) => { + backoff::Error::Transient { err, retry_after: None } + } + ureq::Error::Status(_, _) => backoff::Error::Permanent(err), + ureq::Error::Transport(_) => backoff::Error::Transient { err, retry_after: None }, + } +} + +/// Converts a `ureq::Error` into an `Error`. +fn ureq_error_into_error(error: ureq::Error) -> Error { + #[derive(Deserialize)] + struct MeiliError { + message: String, + code: String, + r#type: String, + link: String, + } + + match error { + // This is a workaround to handle task abortion - the error propagation path + // makes it difficult to cleanly surface the abortion at this level. + ureq::Error::Status(u16::MAX, _) => Error::AbortedTask, + ureq::Error::Status(_, response) => match response.into_json() { + Ok(MeiliError { message, code, r#type, link }) => { + Error::FromRemoteWhenExporting { message, code, r#type, link } + } + Err(e) => e.into(), + }, + ureq::Error::Transport(transport) => io::Error::new(io::ErrorKind::Other, transport).into(), + } +} + +enum ExportIndex {} diff --git a/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs b/crates/index-scheduler/src/scheduler/process_upgrade.rs similarity index 100% rename from crates/index-scheduler/src/scheduler/process_upgrade/mod.rs rename to crates/index-scheduler/src/scheduler/process_upgrade.rs diff --git a/crates/index-scheduler/src/scheduler/test.rs b/crates/index-scheduler/src/scheduler/test.rs index 06bc14051..ee26165c7 100644 --- a/crates/index-scheduler/src/scheduler/test.rs +++ b/crates/index-scheduler/src/scheduler/test.rs @@ -732,6 +732,7 @@ fn basic_get_stats() { "documentDeletion": 0, "documentEdition": 0, "dumpCreation": 0, + "export": 0, "indexCreation": 3, "indexDeletion": 0, "indexSwap": 0, @@ -765,6 +766,7 @@ fn basic_get_stats() { "documentDeletion": 0, "documentEdition": 0, "dumpCreation": 0, + "export": 0, "indexCreation": 3, "indexDeletion": 0, "indexSwap": 0, @@ -805,6 +807,7 @@ fn basic_get_stats() { "documentDeletion": 0, "documentEdition": 0, "dumpCreation": 0, + "export": 0, "indexCreation": 3, "indexDeletion": 0, "indexSwap": 0, @@ -846,6 +849,7 @@ fn basic_get_stats() { "documentDeletion": 0, "documentEdition": 0, "dumpCreation": 0, + "export": 0, "indexCreation": 3, "indexDeletion": 0, "indexSwap": 0, diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index 5f206b55c..bfed7f53a 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -37,6 +37,7 @@ pub(crate) enum FailureLocation { InsideCreateBatch, InsideProcessBatch, PanicInsideProcessBatch, + ProcessExport, ProcessUpgrade, AcquiringWtxn, UpdatingTaskAfterProcessBatchSuccess { task_uid: u32 }, diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index ca37065ec..3c921f099 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -278,6 +278,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) { K::TaskCancelation { .. } | K::TaskDeletion { .. } | K::DumpCreation { .. } + | K::Export { .. } | K::UpgradeDatabase { .. } | K::SnapshotCreation => (), }; @@ -605,6 +606,9 @@ impl crate::IndexScheduler { Details::Dump { dump_uid: _ } => { assert_eq!(kind.as_kind(), Kind::DumpCreation); } + Details::Export { url: _, api_key: _, payload_size: _, indexes: _ } => { + assert_eq!(kind.as_kind(), Kind::Export); + } Details::UpgradeDatabase { from: _, to: _ } => { assert_eq!(kind.as_kind(), Kind::UpgradeDatabase); } diff --git a/crates/meilisearch-types/Cargo.toml b/crates/meilisearch-types/Cargo.toml index f76044078..faf59643f 100644 --- a/crates/meilisearch-types/Cargo.toml +++ b/crates/meilisearch-types/Cargo.toml @@ -15,6 +15,7 @@ actix-web = { version = "4.11.0", default-features = false } anyhow = "1.0.98" bumpalo = "3.18.1" bumparaw-collections = "0.1.4" +byte-unit = { version = "5.1.6", features = ["serde"] } convert_case = "0.8.0" csv = "1.3.1" deserr = { version = "0.6.3", features = ["actix-web"] } diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index d2500b7e1..30f6868f6 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -389,6 +389,13 @@ InvalidDocumentEditionContext , InvalidRequest , BAD_REQU InvalidDocumentEditionFunctionFilter , InvalidRequest , BAD_REQUEST ; EditDocumentsByFunctionError , InvalidRequest , BAD_REQUEST ; InvalidSettingsIndexChat , InvalidRequest , BAD_REQUEST ; +// Export +InvalidExportUrl , InvalidRequest , BAD_REQUEST ; +InvalidExportApiKey , InvalidRequest , BAD_REQUEST ; +InvalidExportPayloadSize , InvalidRequest , BAD_REQUEST ; +InvalidExportIndexesPatterns , InvalidRequest , BAD_REQUEST ; +InvalidExportIndexFilter , InvalidRequest , BAD_REQUEST ; +InvalidExportIndexOverrideSettings , InvalidRequest , BAD_REQUEST ; // Experimental features - Chat Completions UnimplementedExternalFunctionCalling , InvalidRequest , NOT_IMPLEMENTED ; UnimplementedNonStreamingChatCompletions , InvalidRequest , NOT_IMPLEMENTED ; diff --git a/crates/meilisearch-types/src/index_uid_pattern.rs b/crates/meilisearch-types/src/index_uid_pattern.rs index baf0249e2..f90fc7aee 100644 --- a/crates/meilisearch-types/src/index_uid_pattern.rs +++ b/crates/meilisearch-types/src/index_uid_pattern.rs @@ -12,7 +12,7 @@ use crate::index_uid::{IndexUid, IndexUidFormatError}; /// An index uid pattern is composed of only ascii alphanumeric characters, - and _, between 1 and 400 /// bytes long and optionally ending with a *. -#[derive(Serialize, Deserialize, Deserr, Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Serialize, Deserialize, Deserr, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[deserr(try_from(&String) = FromStr::from_str -> IndexUidPatternFormatError)] pub struct IndexUidPattern(String); diff --git a/crates/meilisearch-types/src/keys.rs b/crates/meilisearch-types/src/keys.rs index df2810727..3ba31c2cb 100644 --- a/crates/meilisearch-types/src/keys.rs +++ b/crates/meilisearch-types/src/keys.rs @@ -317,6 +317,9 @@ pub enum Action { #[serde(rename = "experimental.update")] #[deserr(rename = "experimental.update")] ExperimentalFeaturesUpdate, + #[serde(rename = "export")] + #[deserr(rename = "export")] + Export, #[serde(rename = "network.get")] #[deserr(rename = "network.get")] NetworkGet, @@ -438,6 +441,8 @@ pub mod actions { pub const EXPERIMENTAL_FEATURES_GET: u8 = ExperimentalFeaturesGet.repr(); pub const EXPERIMENTAL_FEATURES_UPDATE: u8 = ExperimentalFeaturesUpdate.repr(); + pub const EXPORT: u8 = Export.repr(); + pub const NETWORK_GET: u8 = NetworkGet.repr(); pub const NETWORK_UPDATE: u8 = NetworkUpdate.repr(); diff --git a/crates/meilisearch-types/src/lib.rs b/crates/meilisearch-types/src/lib.rs index a1a57b7e6..fe69da526 100644 --- a/crates/meilisearch-types/src/lib.rs +++ b/crates/meilisearch-types/src/lib.rs @@ -18,7 +18,7 @@ pub mod versioning; pub use milli::{heed, Index}; use uuid::Uuid; pub use versioning::VERSION_FILE_NAME; -pub use {milli, serde_cs}; +pub use {byte_unit, milli, serde_cs}; pub type Document = serde_json::Map; pub type InstanceUid = Uuid; diff --git a/crates/meilisearch-types/src/settings.rs b/crates/meilisearch-types/src/settings.rs index 5e5f3b5b3..7d64440ce 100644 --- a/crates/meilisearch-types/src/settings.rs +++ b/crates/meilisearch-types/src/settings.rs @@ -969,6 +969,7 @@ pub fn settings( if let SecretPolicy::HideSecrets = secret_policy { settings.hide_secrets() } + Ok(settings) } diff --git a/crates/meilisearch-types/src/task_view.rs b/crates/meilisearch-types/src/task_view.rs index 86a00426b..7521137c0 100644 --- a/crates/meilisearch-types/src/task_view.rs +++ b/crates/meilisearch-types/src/task_view.rs @@ -1,3 +1,6 @@ +use std::collections::BTreeMap; + +use byte_unit::UnitType; use milli::Object; use serde::{Deserialize, Serialize}; use time::{Duration, OffsetDateTime}; @@ -6,7 +9,9 @@ use utoipa::ToSchema; use crate::batches::BatchId; use crate::error::ResponseError; use crate::settings::{Settings, Unchecked}; -use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId}; +use crate::tasks::{ + serialize_duration, Details, DetailsExportIndexSettings, IndexSwap, Kind, Status, Task, TaskId, +}; #[derive(Debug, Clone, PartialEq, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -118,6 +123,15 @@ pub struct DetailsView { pub upgrade_from: Option, #[serde(skip_serializing_if = "Option::is_none")] pub upgrade_to: Option, + // exporting + #[serde(skip_serializing_if = "Option::is_none")] + pub url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub api_key: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub payload_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub indexes: Option>, } impl DetailsView { @@ -238,6 +252,34 @@ impl DetailsView { Some(left) } }, + url: match (self.url.clone(), other.url.clone()) { + (None, None) => None, + (None, Some(url)) | (Some(url), None) => Some(url), + // We should never be able to batch multiple exports at the same time. + // So we return the first one we encounter but that shouldn't be an issue anyway. + (Some(left), Some(_right)) => Some(left), + }, + api_key: match (self.api_key.clone(), other.api_key.clone()) { + (None, None) => None, + (None, Some(key)) | (Some(key), None) => Some(key), + // We should never be able to batch multiple exports at the same time. + // So we return the first one we encounter but that shouldn't be an issue anyway. + (Some(left), Some(_right)) => Some(left), + }, + payload_size: match (self.payload_size.clone(), other.payload_size.clone()) { + (None, None) => None, + (None, Some(size)) | (Some(size), None) => Some(size), + // We should never be able to batch multiple exports at the same time. + // So we return the first one we encounter but that shouldn't be an issue anyway. + (Some(left), Some(_right)) => Some(left), + }, + indexes: match (self.indexes.clone(), other.indexes.clone()) { + (None, None) => None, + (None, Some(indexes)) | (Some(indexes), None) => Some(indexes), + // We should never be able to batch multiple exports at the same time. + // So we return the first one we encounter but that shouldn't be an issue anyway. + (Some(left), Some(_right)) => Some(left), + }, // We want the earliest version upgrade_from: match (self.upgrade_from.clone(), other.upgrade_from.clone()) { (None, None) => None, @@ -327,6 +369,22 @@ impl From
for DetailsView { Details::IndexSwap { swaps } => { DetailsView { swaps: Some(swaps), ..Default::default() } } + Details::Export { url, api_key, payload_size, indexes } => DetailsView { + url: Some(url), + api_key: api_key.map(|mut api_key| { + hide_secret(&mut api_key); + api_key + }), + payload_size: payload_size + .map(|ps| ps.get_appropriate_unit(UnitType::Both).to_string()), + indexes: Some( + indexes + .into_iter() + .map(|(pattern, settings)| (pattern.to_string(), settings)) + .collect(), + ), + ..Default::default() + }, Details::UpgradeDatabase { from, to } => DetailsView { upgrade_from: Some(format!("v{}.{}.{}", from.0, from.1, from.2)), upgrade_to: Some(format!("v{}.{}.{}", to.0, to.1, to.2)), @@ -335,3 +393,21 @@ impl From
for DetailsView { } } } + +// We definitely need to factorize the code to hide the secret key +fn hide_secret(secret: &mut String) { + match secret.len() { + x if x < 10 => { + secret.replace_range(.., "XXX..."); + } + x if x < 20 => { + secret.replace_range(2.., "XXXX..."); + } + x if x < 30 => { + secret.replace_range(3.., "XXXXX..."); + } + _x => { + secret.replace_range(5.., "XXXXXX..."); + } + } +} diff --git a/crates/meilisearch-types/src/tasks.rs b/crates/meilisearch-types/src/tasks.rs index 95c52d9a6..99b04f1e3 100644 --- a/crates/meilisearch-types/src/tasks.rs +++ b/crates/meilisearch-types/src/tasks.rs @@ -1,19 +1,22 @@ use core::fmt; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::fmt::{Display, Write}; use std::str::FromStr; +use byte_unit::Byte; use enum_iterator::Sequence; use milli::update::IndexDocumentsMethod; use milli::Object; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize, Serializer}; +use serde_json::Value; use time::{Duration, OffsetDateTime}; -use utoipa::ToSchema; +use utoipa::{schema, ToSchema}; use uuid::Uuid; use crate::batches::BatchId; use crate::error::ResponseError; +use crate::index_uid_pattern::IndexUidPattern; use crate::keys::Key; use crate::settings::{Settings, Unchecked}; use crate::{versioning, InstanceUid}; @@ -50,6 +53,7 @@ impl Task { | SnapshotCreation | TaskCancelation { .. } | TaskDeletion { .. } + | Export { .. } | UpgradeDatabase { .. } | IndexSwap { .. } => None, DocumentAdditionOrUpdate { index_uid, .. } @@ -86,6 +90,7 @@ impl Task { | KindWithContent::TaskDeletion { .. } | KindWithContent::DumpCreation { .. } | KindWithContent::SnapshotCreation + | KindWithContent::Export { .. } | KindWithContent::UpgradeDatabase { .. } => None, } } @@ -108,11 +113,11 @@ pub enum KindWithContent { }, DocumentDeletionByFilter { index_uid: String, - filter_expr: serde_json::Value, + filter_expr: Value, }, DocumentEdition { index_uid: String, - filter_expr: Option, + filter_expr: Option, context: Option, function: String, }, @@ -152,6 +157,12 @@ pub enum KindWithContent { instance_uid: Option, }, SnapshotCreation, + Export { + url: String, + api_key: Option, + payload_size: Option, + indexes: BTreeMap, + }, UpgradeDatabase { from: (u32, u32, u32), }, @@ -163,6 +174,13 @@ pub struct IndexSwap { pub indexes: (String, String), } +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ExportIndexSettings { + pub filter: Option, + pub override_settings: bool, +} + impl KindWithContent { pub fn as_kind(&self) -> Kind { match self { @@ -180,6 +198,7 @@ impl KindWithContent { KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion, KindWithContent::DumpCreation { .. } => Kind::DumpCreation, KindWithContent::SnapshotCreation => Kind::SnapshotCreation, + KindWithContent::Export { .. } => Kind::Export, KindWithContent::UpgradeDatabase { .. } => Kind::UpgradeDatabase, } } @@ -192,6 +211,7 @@ impl KindWithContent { | SnapshotCreation | TaskCancelation { .. } | TaskDeletion { .. } + | Export { .. } | UpgradeDatabase { .. } => vec![], DocumentAdditionOrUpdate { index_uid, .. } | DocumentEdition { index_uid, .. } @@ -269,6 +289,14 @@ impl KindWithContent { }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::SnapshotCreation => None, + KindWithContent::Export { url, api_key, payload_size, indexes } => { + Some(Details::Export { + url: url.clone(), + api_key: api_key.clone(), + payload_size: *payload_size, + indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(), + }) + } KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase { from: (from.0, from.1, from.2), to: ( @@ -335,6 +363,14 @@ impl KindWithContent { }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::SnapshotCreation => None, + KindWithContent::Export { url, api_key, payload_size, indexes } => { + Some(Details::Export { + url: url.clone(), + api_key: api_key.clone(), + payload_size: *payload_size, + indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(), + }) + } KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase { from: *from, to: ( @@ -383,6 +419,14 @@ impl From<&KindWithContent> for Option
{ }), KindWithContent::DumpCreation { .. } => Some(Details::Dump { dump_uid: None }), KindWithContent::SnapshotCreation => None, + KindWithContent::Export { url, api_key, payload_size, indexes } => { + Some(Details::Export { + url: url.clone(), + api_key: api_key.clone(), + payload_size: *payload_size, + indexes: indexes.iter().map(|(p, s)| (p.clone(), s.clone().into())).collect(), + }) + } KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase { from: *from, to: ( @@ -499,6 +543,7 @@ pub enum Kind { TaskDeletion, DumpCreation, SnapshotCreation, + Export, UpgradeDatabase, } @@ -516,6 +561,7 @@ impl Kind { | Kind::TaskCancelation | Kind::TaskDeletion | Kind::DumpCreation + | Kind::Export | Kind::UpgradeDatabase | Kind::SnapshotCreation => false, } @@ -536,6 +582,7 @@ impl Display for Kind { Kind::TaskDeletion => write!(f, "taskDeletion"), Kind::DumpCreation => write!(f, "dumpCreation"), Kind::SnapshotCreation => write!(f, "snapshotCreation"), + Kind::Export => write!(f, "export"), Kind::UpgradeDatabase => write!(f, "upgradeDatabase"), } } @@ -568,6 +615,8 @@ impl FromStr for Kind { Ok(Kind::DumpCreation) } else if kind.eq_ignore_ascii_case("snapshotCreation") { Ok(Kind::SnapshotCreation) + } else if kind.eq_ignore_ascii_case("export") { + Ok(Kind::Export) } else if kind.eq_ignore_ascii_case("upgradeDatabase") { Ok(Kind::UpgradeDatabase) } else { @@ -643,12 +692,33 @@ pub enum Details { IndexSwap { swaps: Vec, }, + Export { + url: String, + api_key: Option, + payload_size: Option, + indexes: BTreeMap, + }, UpgradeDatabase { from: (u32, u32, u32), to: (u32, u32, u32), }, } +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)] +#[schema(rename_all = "camelCase")] +pub struct DetailsExportIndexSettings { + #[serde(flatten)] + pub settings: ExportIndexSettings, + #[serde(skip_serializing_if = "Option::is_none")] + pub matched_documents: Option, +} + +impl From for DetailsExportIndexSettings { + fn from(settings: ExportIndexSettings) -> Self { + DetailsExportIndexSettings { settings, matched_documents: None } + } +} + impl Details { pub fn to_failed(&self) -> Self { let mut details = self.clone(); @@ -667,6 +737,7 @@ impl Details { Self::SettingsUpdate { .. } | Self::IndexInfo { .. } | Self::Dump { .. } + | Self::Export { .. } | Self::UpgradeDatabase { .. } | Self::IndexSwap { .. } => (), } diff --git a/crates/meilisearch/src/routes/export.rs b/crates/meilisearch/src/routes/export.rs new file mode 100644 index 000000000..a4b6720d1 --- /dev/null +++ b/crates/meilisearch/src/routes/export.rs @@ -0,0 +1,183 @@ +use std::collections::BTreeMap; +use std::convert::Infallible; +use std::str::FromStr as _; + +use actix_web::web::{self, Data}; +use actix_web::{HttpRequest, HttpResponse}; +use byte_unit::Byte; +use deserr::actix_web::AwebJson; +use deserr::Deserr; +use index_scheduler::IndexScheduler; +use meilisearch_types::deserr::DeserrJsonError; +use meilisearch_types::error::deserr_codes::*; +use meilisearch_types::error::ResponseError; +use meilisearch_types::index_uid_pattern::IndexUidPattern; +use meilisearch_types::keys::actions; +use meilisearch_types::tasks::{ExportIndexSettings as DbExportIndexSettings, KindWithContent}; +use serde::Serialize; +use serde_json::Value; +use tracing::debug; +use utoipa::{OpenApi, ToSchema}; + +use crate::analytics::Analytics; +use crate::extractors::authentication::policies::ActionPolicy; +use crate::extractors::authentication::GuardedData; +use crate::routes::export_analytics::ExportAnalytics; +use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView}; +use crate::Opt; + +#[derive(OpenApi)] +#[openapi( + paths(export), + tags(( + name = "Export", + description = "The `/export` route allows you to trigger an export process to a remote Meilisearch instance.", + external_docs(url = "https://www.meilisearch.com/docs/reference/api/export"), + )), +)] +pub struct ExportApi; + +pub fn configure(cfg: &mut web::ServiceConfig) { + cfg.service(web::resource("").route(web::post().to(export))); +} + +#[utoipa::path( + post, + path = "", + tag = "Export", + security(("Bearer" = ["export", "*"])), + responses( + (status = 202, description = "Export successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!( + { + "taskUid": 1, + "status": "enqueued", + "type": "export", + "enqueuedAt": "2021-08-11T09:25:53.000000Z" + })), + (status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!( + { + "message": "The Authorization header is missing. It must use the bearer authorization method.", + "code": "missing_authorization_header", + "type": "auth", + "link": "https://docs.meilisearch.com/errors#missing_authorization_header" + } + )), + ) +)] +async fn export( + index_scheduler: GuardedData, Data>, + export: AwebJson, + req: HttpRequest, + opt: web::Data, + analytics: Data, +) -> Result { + let export = export.into_inner(); + debug!(returns = ?export, "Trigger export"); + + let analytics_aggregate = ExportAnalytics::from_export(&export); + + let Export { url, api_key, payload_size, indexes } = export; + + let indexes = match indexes { + Some(indexes) => indexes + .into_iter() + .map(|(pattern, ExportIndexSettings { filter, override_settings })| { + (pattern, DbExportIndexSettings { filter, override_settings }) + }) + .collect(), + None => BTreeMap::from([( + IndexUidPattern::new_unchecked("*"), + DbExportIndexSettings::default(), + )]), + }; + + let task = KindWithContent::Export { + url, + api_key, + payload_size: payload_size.map(|ByteWithDeserr(bytes)| bytes), + indexes, + }; + let uid = get_task_id(&req, &opt)?; + let dry_run = is_dry_run(&req, &opt)?; + let task: SummarizedTaskView = + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) + .await?? + .into(); + + analytics.publish(analytics_aggregate, &req); + + Ok(HttpResponse::Ok().json(task)) +} + +#[derive(Debug, Deserr, ToSchema, Serialize)] +#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +#[schema(rename_all = "camelCase")] +pub struct Export { + #[schema(value_type = Option, example = json!("https://ms-1234.heaven.meilisearch.com"))] + #[serde(default)] + #[deserr(default, error = DeserrJsonError)] + pub url: String, + #[schema(value_type = Option, example = json!("1234abcd"))] + #[serde(default)] + #[deserr(default, error = DeserrJsonError)] + pub api_key: Option, + #[schema(value_type = Option, example = json!("24MiB"))] + #[serde(default)] + #[deserr(default, error = DeserrJsonError)] + pub payload_size: Option, + #[schema(value_type = Option>, example = json!({ "*": { "filter": null } }))] + #[deserr(default)] + #[serde(default)] + pub indexes: Option>, +} + +/// A wrapper around the `Byte` type that implements `Deserr`. +#[derive(Debug, Serialize)] +#[serde(transparent)] +pub struct ByteWithDeserr(pub Byte); + +impl deserr::Deserr for ByteWithDeserr +where + E: deserr::DeserializeError, +{ + fn deserialize_from_value( + value: deserr::Value, + location: deserr::ValuePointerRef, + ) -> Result { + use deserr::{ErrorKind, Value, ValueKind}; + match value { + Value::Integer(integer) => Ok(ByteWithDeserr(Byte::from_u64(integer))), + Value::String(string) => Byte::from_str(&string).map(ByteWithDeserr).map_err(|e| { + deserr::take_cf_content(E::error::( + None, + ErrorKind::Unexpected { msg: e.to_string() }, + location, + )) + }), + actual => Err(deserr::take_cf_content(E::error( + None, + ErrorKind::IncorrectValueKind { + actual, + accepted: &[ValueKind::Integer, ValueKind::String], + }, + location, + ))), + } + } +} + +#[derive(Debug, Deserr, ToSchema, Serialize)] +#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +#[schema(rename_all = "camelCase")] +pub struct ExportIndexSettings { + #[schema(value_type = Option, example = json!("genres = action"))] + #[serde(default)] + #[deserr(default, error = DeserrJsonError)] + pub filter: Option, + #[schema(value_type = Option, example = json!(true))] + #[serde(default)] + #[deserr(default, error = DeserrJsonError)] + pub override_settings: bool, +} diff --git a/crates/meilisearch/src/routes/export_analytics.rs b/crates/meilisearch/src/routes/export_analytics.rs new file mode 100644 index 000000000..b66a5133b --- /dev/null +++ b/crates/meilisearch/src/routes/export_analytics.rs @@ -0,0 +1,93 @@ +use crate::analytics::Aggregate; +use crate::routes::export::Export; + +#[derive(Default)] +pub struct ExportAnalytics { + total_received: usize, + has_api_key: bool, + sum_index_patterns: usize, + sum_patterns_with_filter: usize, + sum_patterns_with_override_settings: usize, + payload_sizes: Vec, +} + +impl ExportAnalytics { + pub fn from_export(export: &Export) -> Self { + let Export { url: _, api_key, payload_size, indexes } = export; + + let has_api_key = api_key.is_some(); + let index_patterns_count = indexes.as_ref().map_or(0, |indexes| indexes.len()); + let patterns_with_filter_count = indexes.as_ref().map_or(0, |indexes| { + indexes.values().filter(|settings| settings.filter.is_some()).count() + }); + let patterns_with_override_settings_count = indexes.as_ref().map_or(0, |indexes| { + indexes.values().filter(|settings| settings.override_settings).count() + }); + let payload_sizes = + if let Some(crate::routes::export::ByteWithDeserr(byte_size)) = payload_size { + vec![byte_size.as_u64()] + } else { + vec![] + }; + + Self { + total_received: 1, + has_api_key, + sum_index_patterns: index_patterns_count, + sum_patterns_with_filter: patterns_with_filter_count, + sum_patterns_with_override_settings: patterns_with_override_settings_count, + payload_sizes, + } + } +} + +impl Aggregate for ExportAnalytics { + fn event_name(&self) -> &'static str { + "Export Triggered" + } + + fn aggregate(mut self: Box, other: Box) -> Box { + self.total_received += other.total_received; + self.has_api_key |= other.has_api_key; + self.sum_index_patterns += other.sum_index_patterns; + self.sum_patterns_with_filter += other.sum_patterns_with_filter; + self.sum_patterns_with_override_settings += other.sum_patterns_with_override_settings; + self.payload_sizes.extend(other.payload_sizes); + self + } + + fn into_event(self: Box) -> serde_json::Value { + let avg_payload_size = if self.payload_sizes.is_empty() { + None + } else { + Some(self.payload_sizes.iter().sum::() / self.payload_sizes.len() as u64) + }; + + let avg_index_patterns = if self.total_received == 0 { + None + } else { + Some(self.sum_index_patterns as f64 / self.total_received as f64) + }; + + let avg_patterns_with_filter = if self.total_received == 0 { + None + } else { + Some(self.sum_patterns_with_filter as f64 / self.total_received as f64) + }; + + let avg_patterns_with_override_settings = if self.total_received == 0 { + None + } else { + Some(self.sum_patterns_with_override_settings as f64 / self.total_received as f64) + }; + + serde_json::json!({ + "total_received": self.total_received, + "has_api_key": self.has_api_key, + "avg_index_patterns": avg_index_patterns, + "avg_patterns_with_filter": avg_patterns_with_filter, + "avg_patterns_with_override_settings": avg_patterns_with_override_settings, + "avg_payload_size": avg_payload_size, + }) + } +} diff --git a/crates/meilisearch/src/routes/mod.rs b/crates/meilisearch/src/routes/mod.rs index cc62e43c3..260d973a1 100644 --- a/crates/meilisearch/src/routes/mod.rs +++ b/crates/meilisearch/src/routes/mod.rs @@ -2,6 +2,7 @@ use std::collections::BTreeMap; use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; +use export::Export; use index_scheduler::IndexScheduler; use meilisearch_auth::AuthController; use meilisearch_types::batch_view::BatchView; @@ -54,6 +55,8 @@ mod api_key; pub mod batches; pub mod chats; mod dump; +mod export; +mod export_analytics; pub mod features; pub mod indexes; mod logs; @@ -84,6 +87,7 @@ mod tasks_test; (path = "/multi-search", api = multi_search::MultiSearchApi), (path = "/swap-indexes", api = swap_indexes::SwapIndexesApi), (path = "/experimental-features", api = features::ExperimentalFeaturesApi), + (path = "/export", api = export::ExportApi), (path = "/network", api = network::NetworkApi), ), paths(get_health, get_version, get_stats), @@ -95,7 +99,7 @@ mod tasks_test; url = "/", description = "Local server", )), - components(schemas(PaginationView, PaginationView, IndexView, DocumentDeletionByFilter, AllBatches, BatchStats, ProgressStepView, ProgressView, BatchView, RuntimeTogglableFeatures, SwapIndexesPayload, DocumentEditionByFunction, MergeFacets, FederationOptions, SearchQueryWithIndex, Federation, FederatedSearch, FederatedSearchResult, SearchResults, SearchResultWithIndex, SimilarQuery, SimilarResult, PaginationView, BrowseQuery, UpdateIndexRequest, IndexUid, IndexCreateRequest, KeyView, Action, CreateApiKey, UpdateStderrLogs, LogMode, GetLogs, IndexStats, Stats, HealthStatus, HealthResponse, VersionResponse, Code, ErrorType, AllTasks, TaskView, Status, DetailsView, ResponseError, Settings, Settings, TypoSettings, MinWordSizeTyposSetting, FacetingSettings, PaginationSettings, SummarizedTaskView, Kind, Network, Remote, FilterableAttributesRule, FilterableAttributesPatterns, AttributePatterns, FilterableAttributesFeatures, FilterFeatures)) + components(schemas(PaginationView, PaginationView, IndexView, DocumentDeletionByFilter, AllBatches, BatchStats, ProgressStepView, ProgressView, BatchView, RuntimeTogglableFeatures, SwapIndexesPayload, DocumentEditionByFunction, MergeFacets, FederationOptions, SearchQueryWithIndex, Federation, FederatedSearch, FederatedSearchResult, SearchResults, SearchResultWithIndex, SimilarQuery, SimilarResult, PaginationView, BrowseQuery, UpdateIndexRequest, IndexUid, IndexCreateRequest, KeyView, Action, CreateApiKey, UpdateStderrLogs, LogMode, GetLogs, IndexStats, Stats, HealthStatus, HealthResponse, VersionResponse, Code, ErrorType, AllTasks, TaskView, Status, DetailsView, ResponseError, Settings, Settings, TypoSettings, MinWordSizeTyposSetting, FacetingSettings, PaginationSettings, SummarizedTaskView, Kind, Network, Remote, FilterableAttributesRule, FilterableAttributesPatterns, AttributePatterns, FilterableAttributesFeatures, FilterFeatures, Export)) )] pub struct MeilisearchApi; @@ -115,6 +119,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::scope("/metrics").configure(metrics::configure)) .service(web::scope("/experimental-features").configure(features::configure)) .service(web::scope("/network").configure(network::configure)) + .service(web::scope("/export").configure(export::configure)) .service(web::scope("/chats").configure(chats::configure)); #[cfg(feature = "swagger")] diff --git a/crates/meilisearch/src/routes/tasks_test.rs b/crates/meilisearch/src/routes/tasks_test.rs index a17b80c82..b09eb0fb3 100644 --- a/crates/meilisearch/src/routes/tasks_test.rs +++ b/crates/meilisearch/src/routes/tasks_test.rs @@ -228,7 +228,7 @@ mod tests { let err = deserr_query_params::(params).unwrap_err(); snapshot!(meili_snap::json_string!(err), @r#" { - "message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", + "message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" diff --git a/crates/meilisearch/tests/auth/api_keys.rs b/crates/meilisearch/tests/auth/api_keys.rs index 5a18b4dbf..2688dd918 100644 --- a/crates/meilisearch/tests/auth/api_keys.rs +++ b/crates/meilisearch/tests/auth/api_keys.rs @@ -421,7 +421,7 @@ async fn error_add_api_key_invalid_parameters_actions() { meili_snap::snapshot!(code, @"400 Bad Request"); meili_snap::snapshot!(meili_snap::json_string!(response, { ".createdAt" => "[ignored]", ".updatedAt" => "[ignored]" }), @r###" { - "message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`", + "message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`", "code": "invalid_api_key_actions", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_api_key_actions" diff --git a/crates/meilisearch/tests/auth/errors.rs b/crates/meilisearch/tests/auth/errors.rs index ebe2e53fa..687cb67a0 100644 --- a/crates/meilisearch/tests/auth/errors.rs +++ b/crates/meilisearch/tests/auth/errors.rs @@ -93,7 +93,7 @@ async fn create_api_key_bad_actions() { snapshot!(code, @"400 Bad Request"); snapshot!(json_string!(response), @r###" { - "message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`", + "message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`", "code": "invalid_api_key_actions", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_api_key_actions" diff --git a/crates/meilisearch/tests/batches/errors.rs b/crates/meilisearch/tests/batches/errors.rs index 7f5fedb6a..bfc0d9251 100644 --- a/crates/meilisearch/tests/batches/errors.rs +++ b/crates/meilisearch/tests/batches/errors.rs @@ -42,7 +42,7 @@ async fn batch_bad_types() { snapshot!(code, @"400 Bad Request"); snapshot!(json_string!(response), @r#" { - "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", + "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" diff --git a/crates/meilisearch/tests/tasks/errors.rs b/crates/meilisearch/tests/tasks/errors.rs index 759531d42..9970bafa4 100644 --- a/crates/meilisearch/tests/tasks/errors.rs +++ b/crates/meilisearch/tests/tasks/errors.rs @@ -97,7 +97,7 @@ async fn task_bad_types() { snapshot!(code, @"400 Bad Request"); snapshot!(json_string!(response), @r#" { - "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", + "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" @@ -108,7 +108,7 @@ async fn task_bad_types() { snapshot!(code, @"400 Bad Request"); snapshot!(json_string!(response), @r#" { - "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", + "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" @@ -119,7 +119,7 @@ async fn task_bad_types() { snapshot!(code, @"400 Bad Request"); snapshot!(json_string!(response), @r#" { - "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `upgradeDatabase`.", + "message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`, `export`, `upgradeDatabase`.", "code": "invalid_task_types", "type": "invalid_request", "link": "https://docs.meilisearch.com/errors#invalid_task_types" diff --git a/crates/milli/src/thread_pool_no_abort.rs b/crates/milli/src/thread_pool_no_abort.rs index 0c2fbb30d..66380ff36 100644 --- a/crates/milli/src/thread_pool_no_abort.rs +++ b/crates/milli/src/thread_pool_no_abort.rs @@ -1,7 +1,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use rayon::{ThreadPool, ThreadPoolBuilder}; +use rayon::{BroadcastContext, ThreadPool, ThreadPoolBuilder}; use thiserror::Error; /// A rayon ThreadPool wrapper that can catch panics in the pool @@ -32,6 +32,22 @@ impl ThreadPoolNoAbort { } } + pub fn broadcast(&self, op: OP) -> Result, PanicCatched> + where + OP: Fn(BroadcastContext<'_>) -> R + Sync, + R: Send, + { + self.active_operations.fetch_add(1, Ordering::Relaxed); + let output = self.thread_pool.broadcast(op); + self.active_operations.fetch_sub(1, Ordering::Relaxed); + // While reseting the pool panic catcher we return an error if we catched one. + if self.pool_catched_panic.swap(false, Ordering::SeqCst) { + Err(PanicCatched) + } else { + Ok(output) + } + } + pub fn current_num_threads(&self) -> usize { self.thread_pool.current_num_threads() } diff --git a/crates/milli/src/update/index_documents/extract/mod.rs b/crates/milli/src/update/index_documents/extract/mod.rs index 9c1971356..d640bc075 100644 --- a/crates/milli/src/update/index_documents/extract/mod.rs +++ b/crates/milli/src/update/index_documents/extract/mod.rs @@ -213,7 +213,7 @@ fn run_extraction_task( }) } -fn request_threads() -> &'static ThreadPoolNoAbort { +pub fn request_threads() -> &'static ThreadPoolNoAbort { static REQUEST_THREADS: OnceLock = OnceLock::new(); REQUEST_THREADS.get_or_init(|| { diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 6e56ad155..5ec6910f7 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use crossbeam_channel::{Receiver, Sender}; use enrich::enrich_documents_batch; +pub use extract::request_threads; use grenad::{Merger, MergerBuilder}; use hashbrown::HashMap; use heed::types::Str; diff --git a/crates/milli/src/update/mod.rs b/crates/milli/src/update/mod.rs index 04ce68fc7..64eb9f1d3 100644 --- a/crates/milli/src/update/mod.rs +++ b/crates/milli/src/update/mod.rs @@ -4,7 +4,7 @@ pub use self::clear_documents::ClearDocuments; pub use self::concurrent_available_ids::ConcurrentAvailableIds; pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::incremental::FacetsUpdateIncrementalInner; -pub use self::index_documents::*; +pub use self::index_documents::{request_threads, *}; pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig}; pub use self::new::ChannelCongestion; pub use self::settings::{validate_embedding_settings, Setting, Settings};