mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-30 23:27:36 +01:00
Route network
This commit is contained in:
parent
b02c2b9a6f
commit
b232bf935b
@ -34,6 +34,7 @@ use crate::routes::features::RuntimeTogglableFeatures;
|
|||||||
use crate::routes::indexes::documents::{DocumentDeletionByFilter, DocumentEditionByFunction};
|
use crate::routes::indexes::documents::{DocumentDeletionByFilter, DocumentEditionByFunction};
|
||||||
use crate::routes::indexes::IndexView;
|
use crate::routes::indexes::IndexView;
|
||||||
use crate::routes::multi_search::SearchResults;
|
use crate::routes::multi_search::SearchResults;
|
||||||
|
use crate::routes::network::{Network, Remote};
|
||||||
use crate::routes::swap_indexes::SwapIndexesPayload;
|
use crate::routes::swap_indexes::SwapIndexesPayload;
|
||||||
use crate::search::{
|
use crate::search::{
|
||||||
FederatedSearch, FederatedSearchResult, Federation, FederationOptions, MergeFacets,
|
FederatedSearch, FederatedSearchResult, Federation, FederationOptions, MergeFacets,
|
||||||
@ -54,6 +55,7 @@ mod logs;
|
|||||||
mod metrics;
|
mod metrics;
|
||||||
mod multi_search;
|
mod multi_search;
|
||||||
mod multi_search_analytics;
|
mod multi_search_analytics;
|
||||||
|
pub mod network;
|
||||||
mod open_api_utils;
|
mod open_api_utils;
|
||||||
mod snapshot;
|
mod snapshot;
|
||||||
mod swap_indexes;
|
mod swap_indexes;
|
||||||
@ -75,6 +77,7 @@ pub mod tasks;
|
|||||||
(path = "/multi-search", api = multi_search::MultiSearchApi),
|
(path = "/multi-search", api = multi_search::MultiSearchApi),
|
||||||
(path = "/swap-indexes", api = swap_indexes::SwapIndexesApi),
|
(path = "/swap-indexes", api = swap_indexes::SwapIndexesApi),
|
||||||
(path = "/experimental-features", api = features::ExperimentalFeaturesApi),
|
(path = "/experimental-features", api = features::ExperimentalFeaturesApi),
|
||||||
|
(path = "/network", api = network::NetworkApi),
|
||||||
),
|
),
|
||||||
paths(get_health, get_version, get_stats),
|
paths(get_health, get_version, get_stats),
|
||||||
tags(
|
tags(
|
||||||
@ -85,7 +88,7 @@ pub mod tasks;
|
|||||||
url = "/",
|
url = "/",
|
||||||
description = "Local server",
|
description = "Local server",
|
||||||
)),
|
)),
|
||||||
components(schemas(PaginationView<KeyView>, PaginationView<IndexView>, IndexView, DocumentDeletionByFilter, AllBatches, BatchStats, ProgressStepView, ProgressView, BatchView, RuntimeTogglableFeatures, SwapIndexesPayload, DocumentEditionByFunction, MergeFacets, FederationOptions, SearchQueryWithIndex, Federation, FederatedSearch, FederatedSearchResult, SearchResults, SearchResultWithIndex, SimilarQuery, SimilarResult, PaginationView<serde_json::Value>, BrowseQuery, UpdateIndexRequest, IndexUid, IndexCreateRequest, KeyView, Action, CreateApiKey, UpdateStderrLogs, LogMode, GetLogs, IndexStats, Stats, HealthStatus, HealthResponse, VersionResponse, Code, ErrorType, AllTasks, TaskView, Status, DetailsView, ResponseError, Settings<Unchecked>, Settings<Checked>, TypoSettings, MinWordSizeTyposSetting, FacetingSettings, PaginationSettings, SummarizedTaskView, Kind))
|
components(schemas(PaginationView<KeyView>, PaginationView<IndexView>, IndexView, DocumentDeletionByFilter, AllBatches, BatchStats, ProgressStepView, ProgressView, BatchView, RuntimeTogglableFeatures, SwapIndexesPayload, DocumentEditionByFunction, MergeFacets, FederationOptions, SearchQueryWithIndex, Federation, FederatedSearch, FederatedSearchResult, SearchResults, SearchResultWithIndex, SimilarQuery, SimilarResult, PaginationView<serde_json::Value>, BrowseQuery, UpdateIndexRequest, IndexUid, IndexCreateRequest, KeyView, Action, CreateApiKey, UpdateStderrLogs, LogMode, GetLogs, IndexStats, Stats, HealthStatus, HealthResponse, VersionResponse, Code, ErrorType, AllTasks, TaskView, Status, DetailsView, ResponseError, Settings<Unchecked>, Settings<Checked>, TypoSettings, MinWordSizeTyposSetting, FacetingSettings, PaginationSettings, SummarizedTaskView, Kind, Network, Remote))
|
||||||
)]
|
)]
|
||||||
pub struct MeilisearchApi;
|
pub struct MeilisearchApi;
|
||||||
|
|
||||||
@ -103,7 +106,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
|||||||
.service(web::scope("/multi-search").configure(multi_search::configure))
|
.service(web::scope("/multi-search").configure(multi_search::configure))
|
||||||
.service(web::scope("/swap-indexes").configure(swap_indexes::configure))
|
.service(web::scope("/swap-indexes").configure(swap_indexes::configure))
|
||||||
.service(web::scope("/metrics").configure(metrics::configure))
|
.service(web::scope("/metrics").configure(metrics::configure))
|
||||||
.service(web::scope("/experimental-features").configure(features::configure));
|
.service(web::scope("/experimental-features").configure(features::configure))
|
||||||
|
.service(web::scope("/network").configure(network::configure));
|
||||||
|
|
||||||
#[cfg(feature = "swagger")]
|
#[cfg(feature = "swagger")]
|
||||||
{
|
{
|
||||||
|
251
crates/meilisearch/src/routes/network.rs
Normal file
251
crates/meilisearch/src/routes/network.rs
Normal file
@ -0,0 +1,251 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
use actix_web::web::{self, Data};
|
||||||
|
use actix_web::{HttpRequest, HttpResponse};
|
||||||
|
use deserr::actix_web::AwebJson;
|
||||||
|
use deserr::Deserr;
|
||||||
|
use index_scheduler::IndexScheduler;
|
||||||
|
use itertools::{EitherOrBoth, Itertools};
|
||||||
|
use meilisearch_types::deserr::DeserrJsonError;
|
||||||
|
use meilisearch_types::error::ResponseError;
|
||||||
|
use meilisearch_types::features::{Network as DbNetwork, Remote as DbRemote};
|
||||||
|
use meilisearch_types::keys::actions;
|
||||||
|
use meilisearch_types::milli::update::Setting;
|
||||||
|
use serde::Serialize;
|
||||||
|
use tracing::debug;
|
||||||
|
use utoipa::{OpenApi, ToSchema};
|
||||||
|
|
||||||
|
use crate::analytics::{Aggregate, Analytics};
|
||||||
|
use crate::extractors::authentication::policies::ActionPolicy;
|
||||||
|
use crate::extractors::authentication::GuardedData;
|
||||||
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
|
|
||||||
|
#[derive(OpenApi)]
|
||||||
|
#[openapi(
|
||||||
|
paths(get_network, patch_network),
|
||||||
|
tags((
|
||||||
|
name = "Network",
|
||||||
|
description = "The `/network` route allows you to describe the topology of a network of Meilisearch instances.
|
||||||
|
|
||||||
|
This route is **synchronous**. This means that no task object will be returned, and any change to the network will be made available immediately.",
|
||||||
|
external_docs(url = "https://www.meilisearch.com/docs/reference/api/network"),
|
||||||
|
)),
|
||||||
|
)]
|
||||||
|
pub struct NetworkApi;
|
||||||
|
|
||||||
|
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||||
|
cfg.service(
|
||||||
|
web::resource("")
|
||||||
|
.route(web::get().to(get_network))
|
||||||
|
.route(web::patch().to(SeqHandler(patch_network))),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get network topology
|
||||||
|
///
|
||||||
|
/// Get a list of all Meilisearch instances currently known to this instance.
|
||||||
|
#[utoipa::path(
|
||||||
|
get,
|
||||||
|
path = "",
|
||||||
|
tag = "Network",
|
||||||
|
security(("Bearer" = ["network.get", "network.*", "*"])),
|
||||||
|
responses(
|
||||||
|
(status = OK, description = "Known nodes are returned", body = Network, content_type = "application/json", example = json!(
|
||||||
|
{
|
||||||
|
"self": "ms-0",
|
||||||
|
"remotes": {
|
||||||
|
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
|
||||||
|
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
|
||||||
|
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
|
||||||
|
}
|
||||||
|
})),
|
||||||
|
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
|
||||||
|
{
|
||||||
|
"message": "The Authorization header is missing. It must use the bearer authorization method.",
|
||||||
|
"code": "missing_authorization_header",
|
||||||
|
"type": "auth",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#missing_authorization_header"
|
||||||
|
}
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn get_network(
|
||||||
|
index_scheduler: GuardedData<ActionPolicy<{ actions::NETWORK_GET }>, Data<IndexScheduler>>,
|
||||||
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
|
index_scheduler.features().check_proxy_search("Using the /network route")?;
|
||||||
|
|
||||||
|
let network = index_scheduler.network();
|
||||||
|
debug!(returns = ?network, "Get network");
|
||||||
|
Ok(HttpResponse::Ok().json(network))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserr, ToSchema, Serialize)]
|
||||||
|
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
#[schema(rename_all = "camelCase")]
|
||||||
|
pub struct Remote {
|
||||||
|
#[schema(value_type = Option<String>, example = json!({
|
||||||
|
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
|
||||||
|
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
|
||||||
|
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
|
||||||
|
}))]
|
||||||
|
#[deserr(default)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub url: Setting<String>,
|
||||||
|
#[schema(value_type = Option<String>, example = json!("XWnBI8QHUc-4IlqbKPLUDuhftNq19mQtjc6JvmivzJU"))]
|
||||||
|
#[deserr(default)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub search_api_key: Setting<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserr, ToSchema, Serialize)]
|
||||||
|
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
#[schema(rename_all = "camelCase")]
|
||||||
|
pub struct Network {
|
||||||
|
#[schema(value_type = Option<BTreeMap<String, Remote>>, example = json!("http://localhost:7700"))]
|
||||||
|
#[deserr(default)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub remotes: Setting<BTreeMap<String, Option<Remote>>>,
|
||||||
|
#[schema(value_type = Option<String>, example = json!("ms-00"), rename = "self")]
|
||||||
|
#[serde(default, rename = "self")]
|
||||||
|
#[deserr(default, rename = "self")]
|
||||||
|
pub local: Setting<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Remote {
|
||||||
|
pub fn try_into_db_node(self, name: &str) -> Result<DbRemote, ResponseError> {
|
||||||
|
Ok(DbRemote {
|
||||||
|
url: self.url.set().ok_or(ResponseError::from_msg(
|
||||||
|
format!("Missing field `{name}.url`"),
|
||||||
|
meilisearch_types::error::Code::BadRequest,
|
||||||
|
))?,
|
||||||
|
search_api_key: self.search_api_key.set(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
pub struct PatchNetworkAnalytics {
|
||||||
|
network_size: usize,
|
||||||
|
network_has_self: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Aggregate for PatchNetworkAnalytics {
|
||||||
|
fn event_name(&self) -> &'static str {
|
||||||
|
"Network Updated"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn aggregate(self: Box<Self>, new: Box<Self>) -> Box<Self> {
|
||||||
|
Box::new(Self { network_size: new.network_size, network_has_self: new.network_has_self })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn into_event(self: Box<Self>) -> serde_json::Value {
|
||||||
|
serde_json::to_value(*self).unwrap_or_default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configure Network
|
||||||
|
///
|
||||||
|
/// Add or remove nodes from network.
|
||||||
|
#[utoipa::path(
|
||||||
|
patch,
|
||||||
|
path = "",
|
||||||
|
tag = "Network",
|
||||||
|
request_body = Network,
|
||||||
|
security(("Bearer" = ["network.update", "network.*", "*"])),
|
||||||
|
responses(
|
||||||
|
(status = OK, description = "New network state is returned", body = Network, content_type = "application/json", example = json!(
|
||||||
|
{
|
||||||
|
"self": "ms-0",
|
||||||
|
"remotes": {
|
||||||
|
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
|
||||||
|
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
|
||||||
|
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
|
||||||
|
}
|
||||||
|
})),
|
||||||
|
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
|
||||||
|
{
|
||||||
|
"message": "The Authorization header is missing. It must use the bearer authorization method.",
|
||||||
|
"code": "missing_authorization_header",
|
||||||
|
"type": "auth",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#missing_authorization_header"
|
||||||
|
}
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
)]
|
||||||
|
async fn patch_network(
|
||||||
|
index_scheduler: GuardedData<ActionPolicy<{ actions::NETWORK_UPDATE }>, Data<IndexScheduler>>,
|
||||||
|
new_network: AwebJson<Network, DeserrJsonError>,
|
||||||
|
req: HttpRequest,
|
||||||
|
analytics: Data<Analytics>,
|
||||||
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
|
index_scheduler.features().check_proxy_search("Using the /network route")?;
|
||||||
|
|
||||||
|
let new_network = new_network.0;
|
||||||
|
let old_network = index_scheduler.network();
|
||||||
|
debug!(parameters = ?new_network, "Patch network");
|
||||||
|
|
||||||
|
let merged_self = match new_network.local {
|
||||||
|
Setting::Set(new_self) => Some(new_self),
|
||||||
|
Setting::Reset => None,
|
||||||
|
Setting::NotSet => old_network.local,
|
||||||
|
};
|
||||||
|
|
||||||
|
let merged_remotes = match new_network.remotes {
|
||||||
|
Setting::Set(new_remotes) => {
|
||||||
|
let mut merged_remotes = BTreeMap::new();
|
||||||
|
for either_or_both in old_network
|
||||||
|
.remotes
|
||||||
|
.into_iter()
|
||||||
|
.merge_join_by(new_remotes.into_iter(), |left, right| left.0.cmp(&right.0))
|
||||||
|
{
|
||||||
|
match either_or_both {
|
||||||
|
EitherOrBoth::Both((key, old), (_, Some(new))) => {
|
||||||
|
let DbRemote { url: old_url, search_api_key: old_search_api_key } = old;
|
||||||
|
|
||||||
|
let Remote { url: new_url, search_api_key: new_search_api_key } = new;
|
||||||
|
|
||||||
|
let merged = DbRemote {
|
||||||
|
url: match new_url {
|
||||||
|
Setting::Set(new_url) => new_url,
|
||||||
|
Setting::Reset => todo!(),
|
||||||
|
Setting::NotSet => old_url,
|
||||||
|
},
|
||||||
|
search_api_key: match new_search_api_key {
|
||||||
|
Setting::Set(new_search_api_key) => Some(new_search_api_key),
|
||||||
|
Setting::Reset => None,
|
||||||
|
Setting::NotSet => old_search_api_key,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
merged_remotes.insert(key, merged);
|
||||||
|
}
|
||||||
|
EitherOrBoth::Both((_, _), (_, None)) | EitherOrBoth::Right((_, None)) => {}
|
||||||
|
EitherOrBoth::Left((key, node)) => {
|
||||||
|
merged_remotes.insert(key, node);
|
||||||
|
}
|
||||||
|
EitherOrBoth::Right((key, Some(node))) => {
|
||||||
|
let node = node.try_into_db_node(&key)?;
|
||||||
|
merged_remotes.insert(key, node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
merged_remotes
|
||||||
|
}
|
||||||
|
Setting::Reset => BTreeMap::new(),
|
||||||
|
Setting::NotSet => old_network.remotes,
|
||||||
|
};
|
||||||
|
|
||||||
|
analytics.publish(
|
||||||
|
PatchNetworkAnalytics {
|
||||||
|
network_size: merged_remotes.len(),
|
||||||
|
network_has_self: merged_self.is_some(),
|
||||||
|
},
|
||||||
|
&req,
|
||||||
|
);
|
||||||
|
|
||||||
|
let merged_network = DbNetwork { local: merged_self, remotes: merged_remotes };
|
||||||
|
index_scheduler.put_network(merged_network.clone())?;
|
||||||
|
debug!(returns = ?merged_network, "Patch network");
|
||||||
|
Ok(HttpResponse::Ok().json(merged_network))
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user