add an experimental cli parameter to allow specifying your task id

This commit is contained in:
Tamo 2024-02-20 11:24:44 +01:00
parent 9ee4f55e6c
commit 01ae46dd80
13 changed files with 78 additions and 21 deletions

View File

@ -250,6 +250,7 @@ impl super::Analytics for SegmentAnalytics {
struct Infos {
env: String,
experimental_enable_metrics: bool,
experimental_ha_parameters: bool,
experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize,
@ -288,6 +289,7 @@ impl From<Opt> for Infos {
let Opt {
db_path,
experimental_enable_metrics,
experimental_ha_parameters,
experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks,
@ -335,6 +337,7 @@ impl From<Opt> for Infos {
Self {
env,
experimental_enable_metrics,
experimental_ha_parameters,
experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage,
db_path: db_path != PathBuf::from("./data.ms"),

View File

@ -131,6 +131,7 @@ gen_seq! { SeqFromRequestFut3; A B C }
gen_seq! { SeqFromRequestFut4; A B C D }
gen_seq! { SeqFromRequestFut5; A B C D E }
gen_seq! { SeqFromRequestFut6; A B C D E F }
gen_seq! { SeqFromRequestFut7; A B C D E F G }
pin_project! {
#[project = ExtractProj]

View File

@ -453,6 +453,7 @@ pub fn configure_data(
.app_data(auth)
.app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs))
.app_data(web::Data::new(opt.clone()))
.app_data(
web::JsonConfig::default()
.limit(http_payload_size_limit)

View File

@ -51,6 +51,7 @@ const MEILI_IGNORE_MISSING_DUMP: &str = "MEILI_IGNORE_MISSING_DUMP";
const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS";
const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR";
const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
const MEILI_EXPERIMENTAL_HA_PARAMETERS: &str = "MEILI_EXPERIMENTAL_HA_PARAMETERS";
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
@ -317,6 +318,17 @@ pub struct Opt {
#[serde(default)]
pub experimental_enable_logs_route: bool,
/// Enable multiple features that helps you to run meilisearch in a high availability context.
/// TODO: TAMO: Update the discussion link
/// For more information, see: <https://github.com/orgs/meilisearch/discussions/721>
///
/// - /!\ Disable the automatic clean up of old processed tasks, you're in charge of that now
/// - Lets you specify a custom task ID upon registering a task
/// - Lets you execute dry-register a task (get an answer from the route but nothing is actually registered in meilisearch and it won't be processed)
#[clap(long, env = MEILI_EXPERIMENTAL_HA_PARAMETERS)]
#[serde(default)]
pub experimental_ha_parameters: bool,
/// Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652>
#[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)]
#[serde(default)]
@ -423,6 +435,7 @@ impl Opt {
no_analytics,
experimental_enable_metrics,
experimental_enable_logs_route,
experimental_ha_parameters,
experimental_reduce_indexing_memory_usage,
} = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
@ -479,6 +492,10 @@ impl Opt {
MEILI_EXPERIMENTAL_ENABLE_METRICS,
experimental_enable_metrics.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_HA_PARAMETERS,
experimental_ha_parameters.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE,
experimental_enable_logs_route.to_string(),

View File

@ -12,6 +12,7 @@ use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{get_task_id, SummarizedTaskView};
use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
@ -21,6 +22,7 @@ pub async fn create_dump(
index_scheduler: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<IndexScheduler>>,
auth_controller: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<AuthController>>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
@ -29,7 +31,7 @@ pub async fn create_dump(
keys: auth_controller.list_keys()?,
instance_uid: analytics.instance_uid().cloned(),
};
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();

View File

@ -38,6 +38,7 @@ use crate::extractors::payload::Payload;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::search::parse_filter;
use crate::Opt;
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
@ -119,6 +120,7 @@ pub async fn delete_document(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<DocumentParam>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let DocumentParam { index_uid, document_id } = path.into_inner();
@ -130,7 +132,7 @@ pub async fn delete_document(
index_uid: index_uid.to_string(),
documents_ids: vec![document_id],
};
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
@ -268,6 +270,7 @@ pub async fn replace_documents(
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
body: Payload,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -278,7 +281,7 @@ pub async fn replace_documents(
analytics.add_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
@ -302,6 +305,7 @@ pub async fn update_documents(
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
body: Payload,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -312,7 +316,7 @@ pub async fn update_documents(
analytics.update_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
@ -472,6 +476,7 @@ pub async fn delete_documents_batch(
index_uid: web::Path<String>,
body: web::Json<Vec<Value>>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by batch");
@ -486,7 +491,7 @@ pub async fn delete_documents_batch(
let task =
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
@ -506,6 +511,7 @@ pub async fn delete_documents_by_filter(
index_uid: web::Path<String>,
body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by filter");
@ -523,7 +529,7 @@ pub async fn delete_documents_by_filter(
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
@ -535,13 +541,14 @@ pub async fn clear_all_documents(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();

View File

@ -22,6 +22,7 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
use crate::Opt;
pub mod documents;
pub mod facet_search;
@ -123,6 +124,7 @@ pub async fn create_index(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, Data<IndexScheduler>>,
body: AwebJson<IndexCreateRequest, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Create index");
@ -137,7 +139,7 @@ pub async fn create_index(
);
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!(returns = ?task, "Create index");
@ -191,6 +193,7 @@ pub async fn update_index(
index_uid: web::Path<String>,
body: AwebJson<UpdateIndexRequest, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Update index");
@ -207,7 +210,7 @@ pub async fn update_index(
primary_key: body.primary_key,
};
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
@ -219,10 +222,11 @@ pub async fn delete_index(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!(returns = ?task, "Delete index");

View File

@ -16,6 +16,7 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::routes::{get_task_id, SummarizedTaskView};
use crate::Opt;
#[macro_export]
macro_rules! make_setting_route {
@ -34,6 +35,7 @@ macro_rules! make_setting_route {
use $crate::extractors::authentication::policies::*;
use $crate::extractors::authentication::GuardedData;
use $crate::extractors::sequential_extractor::SeqHandler;
use $crate::Opt;
use $crate::routes::{get_task_id, SummarizedTaskView};
pub async fn delete(
@ -43,6 +45,7 @@ macro_rules! make_setting_route {
>,
index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -57,7 +60,7 @@ macro_rules! make_setting_route {
is_deletion: true,
allow_index_creation,
};
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid))
.await??
@ -75,6 +78,7 @@ macro_rules! make_setting_route {
index_uid: actix_web::web::Path<String>,
body: deserr::actix_web::AwebJson<Option<$type>, $err_ty>,
req: HttpRequest,
opt: web::Data<Opt>,
$analytics_var: web::Data<dyn Analytics>,
) -> std::result::Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -107,7 +111,7 @@ macro_rules! make_setting_route {
is_deletion: false,
allow_index_creation,
};
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid))
.await??
@ -655,6 +659,7 @@ pub async fn update_all(
index_uid: web::Path<String>,
body: AwebJson<Settings<Unchecked>, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -770,7 +775,7 @@ pub async fn update_all(
is_deletion: false,
allow_index_creation,
};
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
@ -795,6 +800,7 @@ pub async fn delete_all(
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -808,7 +814,7 @@ pub async fn delete_all(
is_deletion: true,
allow_index_creation,
};
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();

View File

@ -15,6 +15,7 @@ use tracing::debug;
use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::Opt;
const PAGINATION_DEFAULT_LIMIT: usize = 20;
@ -45,7 +46,10 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::scope("/experimental-features").configure(features::configure));
}
pub fn get_task_id(req: &HttpRequest) -> Result<Option<TaskId>, ResponseError> {
pub fn get_task_id(req: &HttpRequest, opt: &Opt) -> Result<Option<TaskId>, ResponseError> {
if !opt.experimental_ha_parameters {
return Ok(None);
}
let task_id = req
.headers()
.get("TaskId")

View File

@ -11,6 +11,7 @@ use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{get_task_id, SummarizedTaskView};
use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot))));
@ -19,12 +20,13 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
pub async fn create_snapshot(
index_scheduler: GuardedData<ActionPolicy<{ actions::SNAPSHOTS_CREATE }>, Data<IndexScheduler>>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req));
let task = KindWithContent::SnapshotCreation;
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();

View File

@ -16,6 +16,7 @@ use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(swap_indexes))));
@ -32,6 +33,7 @@ pub async fn swap_indexes(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_SWAP }>, Data<IndexScheduler>>,
params: AwebJson<Vec<SwapIndexesPayload>, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner();
@ -60,7 +62,7 @@ pub async fn swap_indexes(
}
let task = KindWithContent::IndexSwap { swaps };
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
Ok(HttpResponse::Accepted().json(task))

View File

@ -23,6 +23,7 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::Opt;
const DEFAULT_LIMIT: u32 = 20;
@ -161,6 +162,7 @@ async fn cancel_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_CANCEL }>, Data<IndexScheduler>>,
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner();
@ -197,7 +199,7 @@ async fn cancel_tasks(
let task_cancelation =
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task =
task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid)).await??;
let task: SummarizedTaskView = task.into();
@ -209,6 +211,7 @@ async fn delete_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_DELETE }>, Data<IndexScheduler>>,
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner();
@ -244,7 +247,7 @@ async fn delete_tasks(
let task_deletion =
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
let uid = get_task_id(&req)?;
let uid = get_task_id(&req, &opt)?;
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid)).await??;
let task: SummarizedTaskView = task.into();

View File

@ -2,9 +2,10 @@ use actix_web::http::header::ContentType;
use actix_web::test;
use http::header::ACCEPT_ENCODING;
use meili_snap::{json_string, snapshot};
use meilisearch::Opt;
use crate::common::encoder::Encoder;
use crate::common::{Server, Value};
use crate::common::{default_settings, Server, Value};
use crate::json;
#[actix_rt::test]
@ -202,7 +203,11 @@ async fn error_create_with_invalid_index_uid() {
#[actix_rt::test]
async fn send_task_id() {
let server = Server::new().await;
let temp = tempfile::tempdir().unwrap();
let options = Opt { experimental_ha_parameters: true, ..default_settings(temp.path()) };
let server = Server::new_with_options(options).await.unwrap();
let app = server.init_web_app().await;
let index = server.index("catto");
let (response, code) = index.create(None).await;