get rids of meilisearch-lib

This commit is contained in:
Tamo 2022-09-27 16:33:37 +02:00 committed by Clément Renault
parent 0ba1c46e19
commit 2c8f1a43e9
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
38 changed files with 398 additions and 1620 deletions

View file

@ -50,6 +50,7 @@ meilisearch-types = { path = "../meilisearch-types" }
meilisearch-lib = { path = "../meilisearch-lib", default-features = false }
index = { path = "../index" }
index-scheduler = { path = "../index-scheduler" }
file-store = { path = "../file-store" }
document-formats = { path = "../document-formats" }
mimalloc = { version = "0.1.29", default-features = false }
mime = "0.3.16"

View file

@ -1,7 +1,6 @@
use actix_web as aweb;
use aweb::error::{JsonPayloadError, QueryPayloadError};
use document_formats::DocumentFormatError;
use meilisearch_lib::IndexControllerError;
use meilisearch_types::error::{Code, ErrorCode, ResponseError};
use tokio::task::JoinError;
@ -20,9 +19,9 @@ pub enum MeilisearchHttpError {
#[error(transparent)]
Payload(#[from] PayloadError),
#[error(transparent)]
DocumentFormat(#[from] DocumentFormatError),
FileStore(#[from] file_store::Error),
#[error(transparent)]
IndexController(#[from] IndexControllerError),
DocumentFormat(#[from] DocumentFormatError),
#[error(transparent)]
Join(#[from] JoinError),
}
@ -34,8 +33,8 @@ impl ErrorCode for MeilisearchHttpError {
MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType,
MeilisearchHttpError::IndexScheduler(e) => e.error_code(),
MeilisearchHttpError::Payload(e) => e.error_code(),
MeilisearchHttpError::FileStore(_) => Code::Internal,
MeilisearchHttpError::DocumentFormat(e) => e.error_code(),
MeilisearchHttpError::IndexController(e) => e.error_code(),
MeilisearchHttpError::Join(_) => Code::Internal,
}
}

View file

@ -17,28 +17,36 @@ use std::time::Duration;
use crate::error::MeilisearchHttpError;
use actix_web::error::JsonPayloadError;
use actix_web::web::Data;
use analytics::Analytics;
use error::PayloadError;
use http::header::CONTENT_TYPE;
use index_scheduler::milli::update::IndexerConfig;
pub use option::Opt;
use actix_web::{web, HttpRequest};
use extractors::payload::PayloadConfig;
use index_scheduler::IndexScheduler;
use meilisearch_auth::AuthController;
use meilisearch_lib::MeiliSearch;
use sysinfo::{RefreshKind, System, SystemExt};
pub static AUTOBATCHING_ENABLED: AtomicBool = AtomicBool::new(false);
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<MeiliSearch> {
let mut meilisearch = MeiliSearch::builder();
// disable autobatching?
AUTOBATCHING_ENABLED.store(
!opt.scheduler_options.disable_auto_batching,
std::sync::atomic::Ordering::Relaxed,
);
// TODO: TAMO: Finish setting up things
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<IndexScheduler> {
let meilisearch = IndexScheduler::new(
opt.db_path.join("tasks"),
opt.db_path.join("update_files"),
opt.db_path.join("indexes"),
opt.max_index_size.get_bytes() as usize,
(&opt.indexer_options).try_into()?,
#[cfg(test)]
todo!("We'll see later"),
)?;
/*
TODO: We should start a thread to handle the snapshots.
meilisearch
.set_max_index_size(opt.max_index_size.get_bytes() as usize)
.set_max_task_store_size(opt.max_task_db_size.get_bytes() as usize)
@ -63,24 +71,21 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<MeiliSearch> {
if opt.schedule_snapshot {
meilisearch.set_schedule_snapshot();
}
*/
meilisearch.build(
opt.db_path.clone(),
opt.indexer_options.clone(),
opt.scheduler_options.clone(),
)
Ok(meilisearch)
}
pub fn configure_data(
config: &mut web::ServiceConfig,
data: MeiliSearch,
index_scheduler: Data<IndexScheduler>,
auth: AuthController,
opt: &Opt,
analytics: Arc<dyn Analytics>,
) {
let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize;
config
.app_data(data)
.app_data(index_scheduler)
.app_data(auth)
.app_data(web::Data::from(analytics))
.app_data(
@ -170,7 +175,7 @@ macro_rules! create_app {
use meilisearch_types::error::ResponseError;
let app = App::new()
.configure(|s| configure_data(s, $data.clone(), $auth.clone(), &$opt, $analytics))
.configure(|s| configure_data(s, $data, $auth.clone(), &$opt, $analytics))
.configure(routes::configure)
.configure(|s| dashboard(s, $enable_frontend));

View file

@ -2,13 +2,16 @@ use std::env;
use std::path::PathBuf;
use std::sync::Arc;
use actix_cors::Cors;
use actix_web::http::KeepAlive;
use actix_web::HttpServer;
use actix_web::web::Data;
use actix_web::{middleware, HttpServer};
use clap::Parser;
use index_scheduler::IndexScheduler;
use meilisearch_auth::AuthController;
use meilisearch_http::analytics;
use meilisearch_http::analytics::Analytics;
use meilisearch_http::{create_app, setup_meilisearch, Opt};
use meilisearch_lib::MeiliSearch;
use meilisearch_http::{analytics, configure_data, create_app, dashboard, routes};
use meilisearch_http::{setup_meilisearch, Opt};
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
@ -45,9 +48,7 @@ async fn main() -> anyhow::Result<()> {
_ => unreachable!(),
}
let meilisearch = setup_meilisearch(&opt)?;
let m = meilisearch.clone();
tokio::task::spawn_blocking(move || m.run());
let index_scheduler = setup_meilisearch(&opt)?;
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key)?;
@ -62,39 +63,81 @@ async fn main() -> anyhow::Result<()> {
print_launch_resume(&opt, &user, config_read_from);
run_http(meilisearch, auth_controller, opt, analytics).await?;
run_http(index_scheduler, auth_controller, opt, analytics).await?;
Ok(())
}
async fn run_http(
data: MeiliSearch,
index_scheduler: IndexScheduler,
auth_controller: AuthController,
opt: Opt,
analytics: Arc<dyn Analytics>,
) -> anyhow::Result<()> {
let _enable_dashboard = &opt.env == "development";
let enable_dashboard = &opt.env == "development";
let opt_clone = opt.clone();
let index_scheduler = Data::new(index_scheduler);
let http_server = HttpServer::new(move || {
let app = actix_web::App::new()
.configure(|s| {
configure_data(
s,
index_scheduler.clone(),
auth_controller.clone(),
&opt,
analytics.clone(),
)
})
.configure(routes::configure)
.configure(|s| dashboard(s, enable_dashboard));
#[cfg(feature = "metrics")]
let app = app.configure(|s| configure_metrics_route(s, opt.enable_metrics_route));
let app = app
.wrap(
Cors::default()
.send_wildcard()
.allow_any_header()
.allow_any_origin()
.allow_any_method()
.max_age(86_400), // 24h
)
.wrap(middleware::Logger::default())
.wrap(middleware::Compress::default())
.wrap(middleware::NormalizePath::new(
middleware::TrailingSlash::Trim,
));
#[cfg(feature = "metrics")]
let app = app.wrap(Condition::new(
opt.enable_metrics_route,
route_metrics::RouteMetrics,
));
app
/*
create_app!(
data,
auth_controller,
_enable_dashboard,
opt_clone,
index_scheduler.clone(),
auth_controller.clone(),
enable_dashboard,
opt,
analytics.clone()
)
*/
})
// Disable signals allows the server to terminate immediately when a user enter CTRL-C
.disable_signals()
.keep_alive(KeepAlive::Os);
if let Some(config) = opt.get_ssl_config()? {
if let Some(config) = opt_clone.get_ssl_config()? {
http_server
.bind_rustls(opt.http_addr, config)?
.bind_rustls(opt_clone.http_addr, config)?
.run()
.await?;
} else {
http_server.bind(&opt.http_addr)?.run().await?;
http_server.bind(&opt_clone.http_addr)?.run().await?;
}
Ok(())
}

View file

@ -1,15 +1,21 @@
use std::convert::TryFrom;
use std::env;
use std::fs;
use std::io::{BufReader, Read};
use std::num::ParseIntError;
use std::ops::Deref;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::{fmt, fs};
use byte_unit::Byte;
use byte_unit::{Byte, ByteError};
use clap::Parser;
use meilisearch_lib::{
export_to_env_if_not_present,
options::{IndexerOpts, SchedulerConfig},
};
use index_scheduler::milli::update::IndexerConfig;
use rustls::{
server::{
AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient,
@ -19,6 +25,7 @@ use rustls::{
};
use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
use serde::{Deserialize, Serialize};
use sysinfo::{RefreshKind, System, SystemExt};
const POSSIBLE_ENV: [&str; 2] = ["development", "production"];
@ -434,6 +441,164 @@ impl Opt {
}
}
#[derive(Debug, Clone, Parser, Serialize)]
pub struct IndexerOpts {
/// The amount of documents to skip before printing
/// a log regarding the indexing advancement.
#[serde(skip)]
#[clap(long, default_value = "100000", hide = true)] // 100k
pub log_every_n: usize,
/// Grenad max number of chunks in bytes.
#[serde(skip)]
#[clap(long, hide = true)]
pub max_nb_chunks: Option<usize>,
/// The maximum amount of memory the indexer will use. It defaults to 2/3
/// of the available memory. It is recommended to use something like 80%-90%
/// of the available memory, no more.
///
/// In case the engine is unable to retrieve the available memory the engine will
/// try to use the memory it needs but without real limit, this can lead to
/// Out-Of-Memory issues and it is recommended to specify the amount of memory to use.
#[clap(long, env = "MEILI_MAX_INDEXING_MEMORY", default_value_t)]
pub max_indexing_memory: MaxMemory,
/// The maximum number of threads the indexer will use.
/// If the number set is higher than the real number of cores available in the machine,
/// it will use the maximum number of available cores.
///
/// It defaults to half of the available threads.
#[clap(long, env = "MEILI_MAX_INDEXING_THREADS", default_value_t)]
pub max_indexing_threads: MaxThreads,
}
#[derive(Debug, Clone, Parser, Default, Serialize)]
pub struct SchedulerConfig {
/// The engine will disable task auto-batching,
/// and will sequencialy compute each task one by one.
#[clap(long, env = "DISABLE_AUTO_BATCHING")]
pub disable_auto_batching: bool,
}
impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error;
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(*other.max_indexing_threads)
.build()?;
Ok(Self {
log_every_n: Some(other.log_every_n),
max_nb_chunks: other.max_nb_chunks,
max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize),
thread_pool: Some(thread_pool),
max_positions_per_attributes: None,
..Default::default()
})
}
}
impl Default for IndexerOpts {
fn default() -> Self {
Self {
log_every_n: 100_000,
max_nb_chunks: None,
max_indexing_memory: MaxMemory::default(),
max_indexing_threads: MaxThreads::default(),
}
}
}
/// A type used to detect the max memory available and use 2/3 of it.
#[derive(Debug, Clone, Copy, Serialize)]
pub struct MaxMemory(Option<Byte>);
impl FromStr for MaxMemory {
type Err = ByteError;
fn from_str(s: &str) -> Result<MaxMemory, ByteError> {
Byte::from_str(s).map(Some).map(MaxMemory)
}
}
impl Default for MaxMemory {
fn default() -> MaxMemory {
MaxMemory(
total_memory_bytes()
.map(|bytes| bytes * 2 / 3)
.map(Byte::from_bytes),
)
}
}
impl fmt::Display for MaxMemory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.0 {
Some(memory) => write!(f, "{}", memory.get_appropriate_unit(true)),
None => f.write_str("unknown"),
}
}
}
impl Deref for MaxMemory {
type Target = Option<Byte>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl MaxMemory {
pub fn unlimited() -> Self {
Self(None)
}
}
/// Returns the total amount of bytes available or `None` if this system isn't supported.
fn total_memory_bytes() -> Option<u64> {
if System::IS_SUPPORTED {
let memory_kind = RefreshKind::new().with_memory();
let mut system = System::new_with_specifics(memory_kind);
system.refresh_memory();
Some(system.total_memory() * 1024) // KiB into bytes
} else {
None
}
}
#[derive(Debug, Clone, Copy, Serialize)]
pub struct MaxThreads(usize);
impl FromStr for MaxThreads {
type Err = ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
usize::from_str(s).map(Self)
}
}
impl Default for MaxThreads {
fn default() -> Self {
MaxThreads(num_cpus::get() / 2)
}
}
impl fmt::Display for MaxThreads {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl Deref for MaxThreads {
type Target = usize;
fn deref(&self) -> &Self::Target {
&self.0
}
}
fn load_certs(filename: PathBuf) -> anyhow::Result<Vec<rustls::Certificate>> {
let certfile =
fs::File::open(filename).map_err(|_| anyhow::anyhow!("cannot open certificate file"))?;

View file

@ -1,7 +1,8 @@
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::IndexScheduler;
use index_scheduler::KindWithContent;
use log::debug;
use meilisearch_lib::MeiliSearch;
use meilisearch_types::error::ResponseError;
use serde_json::json;
@ -14,16 +15,16 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
}
pub async fn create_dump(
meilisearch: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<IndexScheduler>>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
let task = KindWithContent::DumpExport {
output: "toto".to_string().into(),
output: "todo".to_string().into(),
};
let res = meilisearch.register_task(task).await?;
let res = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", res);
Ok(HttpResponse::Accepted().json(res))

View file

@ -2,16 +2,16 @@ use std::io::Cursor;
use actix_web::error::PayloadError;
use actix_web::http::header::CONTENT_TYPE;
use actix_web::web::Bytes;
use actix_web::web::{Bytes, Data};
use actix_web::HttpMessage;
use actix_web::{web, HttpRequest, HttpResponse};
use bstr::ByteSlice;
use document_formats::{read_csv, read_json, read_ndjson, PayloadType};
use futures::{Stream, StreamExt};
use index_scheduler::milli::update::IndexDocumentsMethod;
use index_scheduler::IndexScheduler;
use index_scheduler::{KindWithContent, TaskView};
use log::debug;
use meilisearch_lib::milli::update::IndexDocumentsMethod;
use meilisearch_lib::MeiliSearch;
use meilisearch_types::error::ResponseError;
use meilisearch_types::star_or::StarOr;
use mime::Mime;
@ -95,24 +95,21 @@ pub struct GetDocument {
}
pub async fn get_document(
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
path: web::Path<DocumentParam>,
params: web::Query<GetDocument>,
) -> Result<HttpResponse, ResponseError> {
let index = path.index_uid.clone();
let id = path.document_id.clone();
let GetDocument { fields } = params.into_inner();
let attributes_to_retrieve = fields.and_then(fold_star_or);
let document = meilisearch
.document(index, id, attributes_to_retrieve)
.await?;
let index = index_scheduler.index(&path.index_uid)?;
let document = index.retrieve_document(&path.document_id, attributes_to_retrieve)?;
debug!("returns: {:?}", document);
Ok(HttpResponse::Ok().json(document))
}
pub async fn delete_document(
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<DocumentParam>,
) -> Result<HttpResponse, ResponseError> {
let DocumentParam {
@ -123,7 +120,7 @@ pub async fn delete_document(
index_uid,
documents_ids: vec![document_id],
};
let task = meilisearch.register_task(task).await?;
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
@ -139,8 +136,8 @@ pub struct BrowseQuery {
}
pub async fn get_all_documents(
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, MeiliSearch>,
path: web::Path<String>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
params: web::Query<BrowseQuery>,
) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params);
@ -151,9 +148,8 @@ pub async fn get_all_documents(
} = params.into_inner();
let attributes_to_retrieve = fields.and_then(fold_star_or);
let (total, documents) = meilisearch
.documents(path.into_inner(), offset, limit, attributes_to_retrieve)
.await?;
let index = index_scheduler.index(&index_uid)?;
let (total, documents) = index.retrieve_documents(offset, limit, attributes_to_retrieve)?;
let ret = PaginationView::new(offset, limit, total as usize, documents);
@ -168,8 +164,8 @@ pub struct UpdateDocumentsQuery {
}
pub async fn add_documents(
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, MeiliSearch>,
path: web::Path<String>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
req: HttpRequest,
@ -177,19 +173,14 @@ pub async fn add_documents(
) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params);
let params = params.into_inner();
let index_uid = path.into_inner();
analytics.add_documents(
&params,
meilisearch.get_index(index_uid.clone()).await.is_err(),
&req,
);
analytics.add_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = meilisearch.filters().allow_index_creation;
let allow_index_creation = index_scheduler.filters().allow_index_creation;
let task = document_addition(
extract_mime_type(&req)?,
meilisearch,
index_uid,
index_scheduler,
index_uid.into_inner(),
params.primary_key,
body,
IndexDocumentsMethod::ReplaceDocuments,
@ -201,7 +192,7 @@ pub async fn add_documents(
}
pub async fn update_documents(
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
path: web::Path<String>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
@ -211,16 +202,12 @@ pub async fn update_documents(
debug!("called with params: {:?}", params);
let index_uid = path.into_inner();
analytics.update_documents(
&params,
meilisearch.get_index(index_uid.clone()).await.is_err(),
&req,
);
analytics.update_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = meilisearch.filters().allow_index_creation;
let allow_index_creation = index_scheduler.filters().allow_index_creation;
let task = document_addition(
extract_mime_type(&req)?,
meilisearch,
index_scheduler,
index_uid,
params.into_inner().primary_key,
body,
@ -234,7 +221,7 @@ pub async fn update_documents(
async fn document_addition(
mime_type: Option<Mime>,
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
index_uid: String,
primary_key: Option<String>,
mut body: Payload,
@ -262,7 +249,7 @@ async fn document_addition(
}
};
let (uuid, mut update_file) = meilisearch.create_update_file()?;
let (uuid, mut update_file) = index_scheduler.create_update_file()?;
// push the entire stream into a `Vec`.
// TODO: Maybe we should write it to a file to reduce the RAM consumption
@ -281,7 +268,7 @@ async fn document_addition(
PayloadType::Ndjson => read_ndjson(reader, update_file.as_file_mut())?,
};
// we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist();
update_file.persist()?;
Ok(documents_count)
})
.await;
@ -289,11 +276,11 @@ async fn document_addition(
let documents_count = match documents_count {
Ok(Ok(documents_count)) => documents_count,
Ok(Err(e)) => {
meilisearch.delete_update_file(uuid)?;
index_scheduler.delete_update_file(uuid)?;
return Err(e.into());
}
Err(e) => {
meilisearch.delete_update_file(uuid)?;
index_scheduler.delete_update_file(uuid)?;
return Err(e.into());
}
};
@ -318,10 +305,11 @@ async fn document_addition(
_ => todo!(),
};
let task = match meilisearch.register_task(task).await {
let scheduler = index_scheduler.clone();
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
Ok(task) => task,
Err(e) => {
meilisearch.delete_update_file(uuid)?;
index_scheduler.delete_update_file(uuid)?;
return Err(e.into());
}
};
@ -331,7 +319,7 @@ async fn document_addition(
}
pub async fn delete_documents(
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<String>,
body: web::Json<Vec<Value>>,
) -> Result<HttpResponse, ResponseError> {
@ -349,20 +337,20 @@ pub async fn delete_documents(
index_uid: path.into_inner(),
documents_ids: ids,
};
let task = meilisearch.register_task(task).await?;
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
pub async fn clear_all_documents(
meilisearch: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<String>,
) -> Result<HttpResponse, ResponseError> {
let task = KindWithContent::DocumentClear {
index_uid: path.into_inner(),
};
let task = meilisearch.register_task(task).await?;
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))

View file

@ -1,7 +1,7 @@
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::KindWithContent;
use index_scheduler::{IndexScheduler, KindWithContent};
use log::debug;
use meilisearch_lib::MeiliSearch;
use meilisearch_types::error::ResponseError;
use serde::{Deserialize, Serialize};
use serde_json::json;
@ -40,17 +40,17 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
}
pub async fn list_indexes(
data: GuardedData<ActionPolicy<{ actions::INDEXES_GET }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_GET }>, Data<IndexScheduler>>,
paginate: web::Query<Pagination>,
) -> Result<HttpResponse, ResponseError> {
let search_rules = &data.filters().search_rules;
let indexes: Vec<_> = data.list_indexes().await?;
let search_rules = &index_scheduler.filters().search_rules;
let indexes: Vec<_> = index_scheduler.indexes()?;
let nb_indexes = indexes.len();
let iter = indexes
.into_iter()
.filter(|index| search_rules.is_index_authorized(&index.name));
/*
TODO: TAMO: implements me
TODO: TAMO: implements me. It's missing a kind of IndexView or something
let ret = paginate
.into_inner()
.auto_paginate_unsized(nb_indexes, iter);
@ -69,7 +69,7 @@ pub struct IndexCreateRequest {
}
pub async fn create_index(
meilisearch: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, Data<IndexScheduler>>,
body: web::Json<IndexCreateRequest>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
@ -88,7 +88,7 @@ pub async fn create_index(
index_uid: uid,
primary_key,
};
let task = meilisearch.register_task(task).await?;
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
Ok(HttpResponse::Accepted().json(task))
} else {
@ -118,10 +118,10 @@ pub struct UpdateIndexResponse {
}
pub async fn get_index(
meilisearch: GuardedData<ActionPolicy<{ actions::INDEXES_GET }>, MeiliSearch>,
path: web::Path<String>,
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_GET }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> {
let meta = meilisearch.get_index(path.into_inner()).await?;
let meta = index_scheduler.index(&index_uid)?;
debug!("returns: {:?}", meta);
// TODO: TAMO: do this as well
@ -130,7 +130,7 @@ pub async fn get_index(
}
pub async fn update_index(
meilisearch: GuardedData<ActionPolicy<{ actions::INDEXES_UPDATE }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_UPDATE }>, Data<IndexScheduler>>,
path: web::Path<String>,
body: web::Json<UpdateIndexRequest>,
req: HttpRequest,
@ -149,26 +149,27 @@ pub async fn update_index(
primary_key: body.primary_key,
};
let task = meilisearch.register_task(task).await?;
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
pub async fn delete_index(
meilisearch: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, MeiliSearch>,
path: web::Path<String>,
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = path.into_inner();
let task = KindWithContent::IndexDeletion { index_uid };
let task = meilisearch.register_task(task).await?;
let task = KindWithContent::IndexDeletion {
index_uid: index_uid.into_inner(),
};
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
Ok(HttpResponse::Accepted().json(task))
}
pub async fn get_index_stats(
meilisearch: GuardedData<ActionPolicy<{ actions::STATS_GET }>, MeiliSearch>,
path: web::Path<String>,
index_scheduler: GuardedData<ActionPolicy<{ actions::STATS_GET }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
@ -177,7 +178,10 @@ pub async fn get_index_stats(
json!({ "per_index_uid": true }),
Some(&req),
);
let response = meilisearch.get_index_stats(path.into_inner()).await?;
let index = index_scheduler.index(&index_uid)?;
// TODO: TAMO: Bring the index_stats in meilisearch-http
// let response = index.get_index_stats()?;
let response = todo!();
debug!("returns: {:?}", response);
Ok(HttpResponse::Ok().json(response))

View file

@ -1,12 +1,13 @@
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse};
use index::{
MatchingStrategy, SearchQuery, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER,
DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT,
DEFAULT_SEARCH_OFFSET,
};
use index_scheduler::IndexScheduler;
use log::debug;
use meilisearch_auth::IndexSearchRules;
use meilisearch_lib::MeiliSearch;
use meilisearch_types::error::ResponseError;
use serde::Deserialize;
use serde_cs::vec::CS;
@ -136,8 +137,8 @@ fn fix_sort_query_parameters(sort_query: &str) -> Vec<String> {
}
pub async fn search_with_url_query(
meilisearch: GuardedData<ActionPolicy<{ actions::SEARCH }>, MeiliSearch>,
path: web::Path<String>,
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
params: web::Query<SearchQueryGet>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
@ -145,9 +146,8 @@ pub async fn search_with_url_query(
debug!("called with params: {:?}", params);
let mut query: SearchQuery = params.into_inner().into();
let index_uid = path.into_inner();
// Tenant token search_rules.
if let Some(search_rules) = meilisearch
if let Some(search_rules) = index_scheduler
.filters()
.search_rules
.get_index_search_rules(&index_uid)
@ -157,7 +157,8 @@ pub async fn search_with_url_query(
let mut aggregate = SearchAggregator::from_query(&query, &req);
let search_result = meilisearch.search(index_uid, query).await;
let index = index_scheduler.index(&index_uid)?;
let search_result = index.perform_search(query);
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
}
@ -170,8 +171,8 @@ pub async fn search_with_url_query(
}
pub async fn search_with_post(
meilisearch: GuardedData<ActionPolicy<{ actions::SEARCH }>, MeiliSearch>,
path: web::Path<String>,
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
params: web::Json<SearchQuery>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
@ -179,9 +180,8 @@ pub async fn search_with_post(
let mut query = params.into_inner();
debug!("search called with params: {:?}", query);
let index_uid = path.into_inner();
// Tenant token search_rules.
if let Some(search_rules) = meilisearch
if let Some(search_rules) = index_scheduler
.filters()
.search_rules
.get_index_search_rules(&index_uid)
@ -191,7 +191,8 @@ pub async fn search_with_post(
let mut aggregate = SearchAggregator::from_query(&query, &req);
let search_result = meilisearch.search(index_uid, query).await;
let index = index_scheduler.index(&index_uid)?;
let search_result = index.perform_search(query);
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
}

View file

@ -1,9 +1,9 @@
use actix_web::web::Data;
use log::debug;
use actix_web::{web, HttpRequest, HttpResponse};
use index::{Settings, Unchecked};
use index_scheduler::KindWithContent;
use meilisearch_lib::MeiliSearch;
use index_scheduler::{IndexScheduler, KindWithContent};
use meilisearch_types::error::ResponseError;
use serde_json::json;
@ -14,13 +14,13 @@ use crate::extractors::authentication::{policies::*, GuardedData};
macro_rules! make_setting_route {
($route:literal, $update_verb:ident, $type:ty, $attr:ident, $camelcase_attr:literal, $analytics_var:ident, $analytics:expr) => {
pub mod $attr {
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse, Resource};
use log::debug;
use index::Settings;
use index_scheduler::KindWithContent;
use meilisearch_lib::milli::update::Setting;
use meilisearch_lib::MeiliSearch;
use index_scheduler::milli::update::Setting;
use index_scheduler::{IndexScheduler, KindWithContent};
use meilisearch_types::error::ResponseError;
use $crate::analytics::Analytics;
@ -28,7 +28,10 @@ macro_rules! make_setting_route {
use $crate::extractors::sequential_extractor::SeqHandler;
pub async fn delete(
meilisearch: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, MeiliSearch>,
index_scheduler: GuardedData<
ActionPolicy<{ actions::SETTINGS_UPDATE }>,
Data<IndexScheduler>,
>,
index_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> {
let new_settings = Settings {
@ -36,21 +39,25 @@ macro_rules! make_setting_route {
..Default::default()
};
let allow_index_creation = meilisearch.filters().allow_index_creation;
let allow_index_creation = index_scheduler.filters().allow_index_creation;
let task = KindWithContent::Settings {
index_uid: index_uid.into_inner(),
new_settings,
is_deletion: true,
allow_index_creation,
};
let task = meilisearch.register_task(task).await?;
let task =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
pub async fn update(
meilisearch: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, MeiliSearch>,
index_scheduler: GuardedData<
ActionPolicy<{ actions::SETTINGS_UPDATE }>,
Data<IndexScheduler>,
>,
index_uid: actix_web::web::Path<String>,
body: actix_web::web::Json<Option<$type>>,
req: HttpRequest,
@ -68,24 +75,28 @@ macro_rules! make_setting_route {
..Default::default()
};
let allow_index_creation = meilisearch.filters().allow_index_creation;
let allow_index_creation = index_scheduler.filters().allow_index_creation;
let task = KindWithContent::Settings {
index_uid: index_uid.into_inner(),
new_settings,
is_deletion: false,
allow_index_creation,
};
let task = meilisearch.register_task(task).await?;
let task =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
pub async fn get(
meilisearch: GuardedData<ActionPolicy<{ actions::SETTINGS_GET }>, MeiliSearch>,
index_scheduler: GuardedData<
ActionPolicy<{ actions::SETTINGS_GET }>,
Data<IndexScheduler>,
>,
index_uid: actix_web::web::Path<String>,
) -> std::result::Result<HttpResponse, ResponseError> {
let index = meilisearch.get_index(index_uid.into_inner()).await?;
let index = index_scheduler.index(&index_uid)?;
let settings = index.settings()?;
debug!("returns: {:?}", settings);
@ -353,7 +364,7 @@ generate_configure!(
);
pub async fn update_all(
meilisearch: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
body: web::Json<Settings<Unchecked>>,
req: HttpRequest,
@ -425,43 +436,43 @@ pub async fn update_all(
Some(&req),
);
let allow_index_creation = meilisearch.filters().allow_index_creation;
let allow_index_creation = index_scheduler.filters().allow_index_creation;
let task = KindWithContent::Settings {
index_uid: index_uid.into_inner(),
new_settings,
is_deletion: false,
allow_index_creation,
};
let task = meilisearch.register_task(task).await?;
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
pub async fn get_all(
meilisearch: GuardedData<ActionPolicy<{ actions::SETTINGS_GET }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_GET }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> {
let index = meilisearch.get_index(index_uid.into_inner()).await?;
let index = index_scheduler.index(&index_uid)?;
let new_settings = index.settings()?;
debug!("returns: {:?}", new_settings);
Ok(HttpResponse::Ok().json(new_settings))
}
pub async fn delete_all(
data: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> {
let new_settings = Settings::cleared().into_unchecked();
let allow_index_creation = data.filters().allow_index_creation;
let allow_index_creation = index_scheduler.filters().allow_index_creation;
let task = KindWithContent::Settings {
index_uid: index_uid.into_inner(),
new_settings,
is_deletion: true,
allow_index_creation,
};
let task = data.register_task(task).await?;
let task = tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??;
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))

View file

@ -1,4 +1,6 @@
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::IndexScheduler;
use log::debug;
use serde::{Deserialize, Serialize};
@ -6,7 +8,6 @@ use serde_json::json;
use time::OffsetDateTime;
use index::{Settings, Unchecked};
use meilisearch_lib::MeiliSearch;
use meilisearch_types::error::ResponseError;
use meilisearch_types::star_or::StarOr;
@ -232,7 +233,7 @@ pub async fn running() -> HttpResponse {
}
async fn get_stats(
meilisearch: GuardedData<ActionPolicy<{ actions::STATS_GET }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::STATS_GET }>, Data<IndexScheduler>>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
@ -241,8 +242,9 @@ async fn get_stats(
json!({ "per_index_uid": false }),
Some(&req),
);
let search_rules = &meilisearch.filters().search_rules;
let response = meilisearch.get_all_stats(search_rules).await?;
let search_rules = &index_scheduler.filters().search_rules;
// let response = index_scheduler.get_all_stats(search_rules).await?;
let response = todo!();
debug!("returns: {:?}", response);
Ok(HttpResponse::Ok().json(response))
@ -257,7 +259,7 @@ struct VersionResponse {
}
async fn get_version(
_meilisearch: GuardedData<ActionPolicy<{ actions::VERSION }>, MeiliSearch>,
_index_scheduler: GuardedData<ActionPolicy<{ actions::VERSION }>, Data<IndexScheduler>>,
) -> HttpResponse {
let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
let commit_date = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP").unwrap_or("unknown");

View file

@ -1,7 +1,7 @@
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::TaskId;
use index_scheduler::{IndexScheduler, TaskId};
use index_scheduler::{Kind, Status};
use meilisearch_lib::MeiliSearch;
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::star_or::StarOr;
@ -15,7 +15,7 @@ use crate::extractors::sequential_extractor::SeqHandler;
use super::fold_star_or;
const DEFAULT_LIMIT: fn() -> usize = || 20;
const DEFAULT_LIMIT: fn() -> u32 = || 20;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::get().to(SeqHandler(get_tasks))))
@ -30,7 +30,7 @@ pub struct TasksFilterQuery {
status: Option<CS<StarOr<Status>>>,
index_uid: Option<CS<StarOr<IndexUid>>>,
#[serde(default = "DEFAULT_LIMIT")]
limit: usize,
limit: u32,
from: Option<TaskId>,
}
@ -60,7 +60,7 @@ fn task_status_matches_events(status: &TaskStatus, events: &[TaskEvent]) -> bool
}
async fn get_tasks(
meilisearch: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>,
params: web::Query<TasksFilterQuery>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
@ -73,7 +73,7 @@ async fn get_tasks(
from,
} = params.into_inner();
let search_rules = &meilisearch.filters().search_rules;
let search_rules = &index_scheduler.filters().search_rules;
// We first transform a potential indexUid=* into a "not specified indexUid filter"
// for every one of the filters: type, status, and indexUid.
@ -124,14 +124,16 @@ async fn get_tasks(
}
}
filters.from = from;
// We +1 just to know if there is more after this "page" or not.
let limit = limit.saturating_add(1);
filters.limit = limit;
let mut tasks_results: Vec<_> = meilisearch.list_tasks(filters).await?.into_iter().collect();
let mut tasks_results: Vec<_> = index_scheduler.get_tasks(filters)?.into_iter().collect();
// If we were able to fetch the number +1 tasks we asked
// it means that there is more to come.
let next = if tasks_results.len() == limit {
let next = if tasks_results.len() == limit as usize {
tasks_results.pop().map(|t| t.uid)
} else {
None
@ -151,7 +153,7 @@ async fn get_tasks(
}
async fn get_task(
meilisearch: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, MeiliSearch>,
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>,
task_id: web::Path<TaskId>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
@ -164,7 +166,7 @@ async fn get_task(
Some(&req),
);
let search_rules = &meilisearch.filters().search_rules;
let search_rules = &index_scheduler.filters().search_rules;
let mut filters = index_scheduler::Query::default();
if !search_rules.is_index_authorized("*") {
for (index, _policy) in search_rules.clone() {
@ -174,7 +176,7 @@ async fn get_task(
filters.uid = Some(vec![task_id]);
if let Some(task) = meilisearch.list_tasks(filters).await?.first() {
if let Some(task) = index_scheduler.get_tasks(filters)?.first() {
Ok(HttpResponse::Ok().json(task))
} else {
Err(index_scheduler::Error::TaskNotFound(task_id).into())