mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-04 18:45:46 +01:00
Create a task on zookeeper side when a task is created locally
This commit is contained in:
parent
b2f36b9b97
commit
b66bf049b5
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -1994,7 +1994,9 @@ dependencies = [
|
|||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"time",
|
"time",
|
||||||
|
"tokio",
|
||||||
"uuid 1.4.1",
|
"uuid 1.4.1",
|
||||||
|
"zookeeper-client",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -31,6 +31,8 @@ tempfile = "3.5.0"
|
|||||||
thiserror = "1.0.40"
|
thiserror = "1.0.40"
|
||||||
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
|
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
|
||||||
uuid = { version = "1.3.1", features = ["serde", "v4"] }
|
uuid = { version = "1.3.1", features = ["serde", "v4"] }
|
||||||
|
tokio = { version = "1.27.0", features = ["full"] }
|
||||||
|
zookeeper-client = "0.5.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
big_s = "1.0.2"
|
big_s = "1.0.2"
|
||||||
|
@ -58,6 +58,7 @@ use time::format_description::well_known::Rfc3339;
|
|||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
|
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
use zookeeper_client as zk;
|
||||||
|
|
||||||
use crate::index_mapper::IndexMapper;
|
use crate::index_mapper::IndexMapper;
|
||||||
use crate::utils::{check_index_swap_validity, clamp_to_page_size};
|
use crate::utils::{check_index_swap_validity, clamp_to_page_size};
|
||||||
@ -258,6 +259,8 @@ pub struct IndexSchedulerOptions {
|
|||||||
pub max_number_of_tasks: usize,
|
pub max_number_of_tasks: usize,
|
||||||
/// The experimental features enabled for this instance.
|
/// The experimental features enabled for this instance.
|
||||||
pub instance_features: InstanceTogglableFeatures,
|
pub instance_features: InstanceTogglableFeatures,
|
||||||
|
/// zookeeper client
|
||||||
|
pub zk: Option<zk::Client>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Structure which holds meilisearch's indexes and schedules the tasks
|
/// Structure which holds meilisearch's indexes and schedules the tasks
|
||||||
@ -326,6 +329,9 @@ pub struct IndexScheduler {
|
|||||||
/// The path to the version file of Meilisearch.
|
/// The path to the version file of Meilisearch.
|
||||||
pub(crate) version_file_path: PathBuf,
|
pub(crate) version_file_path: PathBuf,
|
||||||
|
|
||||||
|
/// The URL to the ZooKeeper cluster
|
||||||
|
pub(crate) zk: Option<zk::Client>,
|
||||||
|
|
||||||
// ================= test
|
// ================= test
|
||||||
// The next entry is dedicated to the tests.
|
// The next entry is dedicated to the tests.
|
||||||
/// Provide a way to set a breakpoint in multiple part of the scheduler.
|
/// Provide a way to set a breakpoint in multiple part of the scheduler.
|
||||||
@ -367,6 +373,7 @@ impl IndexScheduler {
|
|||||||
snapshots_path: self.snapshots_path.clone(),
|
snapshots_path: self.snapshots_path.clone(),
|
||||||
dumps_path: self.dumps_path.clone(),
|
dumps_path: self.dumps_path.clone(),
|
||||||
auth_path: self.auth_path.clone(),
|
auth_path: self.auth_path.clone(),
|
||||||
|
zk: self.zk.clone(),
|
||||||
version_file_path: self.version_file_path.clone(),
|
version_file_path: self.version_file_path.clone(),
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
||||||
@ -381,7 +388,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
impl IndexScheduler {
|
impl IndexScheduler {
|
||||||
/// Create an index scheduler and start its run loop.
|
/// Create an index scheduler and start its run loop.
|
||||||
pub fn new(
|
pub async fn new(
|
||||||
options: IndexSchedulerOptions,
|
options: IndexSchedulerOptions,
|
||||||
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
|
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
|
||||||
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
|
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
|
||||||
@ -463,7 +470,7 @@ impl IndexScheduler {
|
|||||||
snapshots_path: options.snapshots_path,
|
snapshots_path: options.snapshots_path,
|
||||||
auth_path: options.auth_path,
|
auth_path: options.auth_path,
|
||||||
version_file_path: options.version_file_path,
|
version_file_path: options.version_file_path,
|
||||||
|
zk: options.zk,
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
test_breakpoint_sdr,
|
test_breakpoint_sdr,
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -473,7 +480,7 @@ impl IndexScheduler {
|
|||||||
features,
|
features,
|
||||||
};
|
};
|
||||||
|
|
||||||
this.run();
|
this.run().await;
|
||||||
Ok(this)
|
Ok(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -561,31 +568,63 @@ impl IndexScheduler {
|
|||||||
///
|
///
|
||||||
/// This function will execute in a different thread and must be called
|
/// This function will execute in a different thread and must be called
|
||||||
/// only once per index scheduler.
|
/// only once per index scheduler.
|
||||||
fn run(&self) {
|
async fn run(&self) {
|
||||||
let run = self.private_clone();
|
let run = self.private_clone();
|
||||||
std::thread::Builder::new()
|
tokio::task::spawn(async move {
|
||||||
.name(String::from("scheduler"))
|
#[cfg(test)]
|
||||||
.spawn(move || {
|
run.breakpoint(Breakpoint::Init);
|
||||||
#[cfg(test)]
|
|
||||||
run.breakpoint(Breakpoint::Init);
|
|
||||||
|
|
||||||
run.wake_up.wait();
|
let wake_up = run.wake_up.clone();
|
||||||
|
tokio::task::spawn_blocking(move || wake_up.wait()).await;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match run.tick() {
|
match run.tick().await {
|
||||||
Ok(TickOutcome::TickAgain(_)) => (),
|
Ok(TickOutcome::TickAgain(_)) => (),
|
||||||
Ok(TickOutcome::WaitForSignal) => run.wake_up.wait(),
|
Ok(TickOutcome::WaitForSignal) => {
|
||||||
Err(e) => {
|
let wake_up = run.wake_up.clone();
|
||||||
log::error!("{}", e);
|
tokio::task::spawn_blocking(move || wake_up.wait()).await;
|
||||||
// Wait one second when an irrecoverable error occurs.
|
}
|
||||||
if !e.is_recoverable() {
|
Err(e) => {
|
||||||
std::thread::sleep(Duration::from_secs(1));
|
log::error!("{}", e);
|
||||||
}
|
// Wait one second when an irrecoverable error occurs.
|
||||||
|
if !e.is_recoverable() {
|
||||||
|
std::thread::sleep(Duration::from_secs(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.unwrap();
|
});
|
||||||
|
|
||||||
|
if let Some(ref zk) = &self.zk {
|
||||||
|
let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
|
||||||
|
match zk.create("/tasks", &[], &options).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(zk::Error::NodeExists) => {
|
||||||
|
todo!("Syncronize with the cluster")
|
||||||
|
}
|
||||||
|
Err(e) => panic!("{e}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: fix unwrap by returning a clear error.
|
||||||
|
let mut watcher =
|
||||||
|
zk.watch("/tasks", zk::AddWatchMode::PersistentRecursive).await.unwrap();
|
||||||
|
let czk = zk.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let zk = czk;
|
||||||
|
loop {
|
||||||
|
let zk::WatchedEvent { event_type, session_state, path } =
|
||||||
|
dbg!(watcher.changed().await);
|
||||||
|
match event_type {
|
||||||
|
zk::EventType::Session => panic!("Session error {:?}", session_state),
|
||||||
|
// A task as been added
|
||||||
|
zk::EventType::NodeDataChanged => {
|
||||||
|
// Add raw task content in local DB
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn indexer_config(&self) -> &IndexerConfig {
|
pub fn indexer_config(&self) -> &IndexerConfig {
|
||||||
@ -920,74 +959,118 @@ impl IndexScheduler {
|
|||||||
/// Register a new task in the scheduler.
|
/// Register a new task in the scheduler.
|
||||||
///
|
///
|
||||||
/// If it fails and data was associated with the task, it tries to delete the associated data.
|
/// If it fails and data was associated with the task, it tries to delete the associated data.
|
||||||
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
|
pub async fn register(&self, kind: KindWithContent) -> Result<Task> {
|
||||||
let mut wtxn = self.env.write_txn()?;
|
let id = match self.zk {
|
||||||
|
Some(ref zk) => {
|
||||||
|
// reserve uniq ID on zookeeper. And give it to the spawn blocking.
|
||||||
|
let options =
|
||||||
|
zk::CreateMode::PersistentSequential.with_acls(zk::Acls::anyone_all());
|
||||||
|
match zk.create("/tasks/task-", &[], &options).await {
|
||||||
|
Ok((_stats, id)) => Some(id),
|
||||||
|
Err(e) => panic!("{e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
|
let this = self.private_clone();
|
||||||
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
|
let task = tokio::task::spawn_blocking(move || {
|
||||||
&& (self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64 > 50
|
let mut wtxn = this.env.write_txn()?;
|
||||||
{
|
|
||||||
return Err(Error::NoSpaceLeftInTaskQueue);
|
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
|
||||||
|
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
|
||||||
|
&& (this.env.non_free_pages_size()? * 100) / this.env.map_size()? as u64 > 50
|
||||||
|
{
|
||||||
|
return Err(Error::NoSpaceLeftInTaskQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
// get id generated by zookeeper or generate a local id.
|
||||||
|
let id = match id {
|
||||||
|
Some(id) => id.0 as u32,
|
||||||
|
None => this.next_task_id(&wtxn)?,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut task = Task {
|
||||||
|
uid: id,
|
||||||
|
enqueued_at: OffsetDateTime::now_utc(),
|
||||||
|
started_at: None,
|
||||||
|
finished_at: None,
|
||||||
|
error: None,
|
||||||
|
canceled_by: None,
|
||||||
|
details: kind.default_details(),
|
||||||
|
status: Status::Enqueued,
|
||||||
|
kind: kind.clone(),
|
||||||
|
};
|
||||||
|
// For deletion and cancelation tasks, we want to make extra sure that they
|
||||||
|
// don't attempt to delete/cancel tasks that are newer than themselves.
|
||||||
|
filter_out_references_to_newer_tasks(&mut task);
|
||||||
|
// If the register task is an index swap task, verify that it is well-formed
|
||||||
|
// (that it does not contain duplicate indexes).
|
||||||
|
check_index_swap_validity(&task)?;
|
||||||
|
|
||||||
|
this.register_raw_task(&mut wtxn, &task)?;
|
||||||
|
|
||||||
|
if let Err(e) = wtxn.commit() {
|
||||||
|
this.delete_persisted_task_data(&task)?;
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the registered task is a task cancelation
|
||||||
|
// we inform the processing tasks to stop (if necessary).
|
||||||
|
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
|
||||||
|
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
|
||||||
|
if this
|
||||||
|
.processing_tasks
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.must_cancel_processing_tasks(&tasks_to_cancel)
|
||||||
|
{
|
||||||
|
this.must_stop_processing.must_stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// notify the scheduler loop to execute a new tick
|
||||||
|
this.wake_up.signal();
|
||||||
|
|
||||||
|
Ok(task)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()?;
|
||||||
|
|
||||||
|
// TODO: send task to ZK in raw json.
|
||||||
|
if let Some(ref zk) = self.zk {
|
||||||
|
let id = id.unwrap();
|
||||||
|
// TODO: ugly unwrap
|
||||||
|
zk.set_data(
|
||||||
|
&format!("/tasks/task-{}", id),
|
||||||
|
&serde_json::to_vec_pretty(&task).unwrap(),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut task = Task {
|
Ok(task)
|
||||||
uid: self.next_task_id(&wtxn)?,
|
}
|
||||||
enqueued_at: OffsetDateTime::now_utc(),
|
|
||||||
started_at: None,
|
|
||||||
finished_at: None,
|
|
||||||
error: None,
|
|
||||||
canceled_by: None,
|
|
||||||
details: kind.default_details(),
|
|
||||||
status: Status::Enqueued,
|
|
||||||
kind: kind.clone(),
|
|
||||||
};
|
|
||||||
// For deletion and cancelation tasks, we want to make extra sure that they
|
|
||||||
// don't attempt to delete/cancel tasks that are newer than themselves.
|
|
||||||
filter_out_references_to_newer_tasks(&mut task);
|
|
||||||
// If the register task is an index swap task, verify that it is well-formed
|
|
||||||
// (that it does not contain duplicate indexes).
|
|
||||||
check_index_swap_validity(&task)?;
|
|
||||||
|
|
||||||
// Get rid of the mutability.
|
pub fn register_raw_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
|
||||||
let task = task;
|
self.all_tasks.append(wtxn, &BEU32::new(task.uid), &task)?;
|
||||||
|
|
||||||
self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
|
|
||||||
|
|
||||||
for index in task.indexes() {
|
for index in task.indexes() {
|
||||||
self.update_index(&mut wtxn, index, |bitmap| {
|
self.update_index(wtxn, index, |bitmap| {
|
||||||
bitmap.insert(task.uid);
|
bitmap.insert(task.uid);
|
||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.update_status(&mut wtxn, Status::Enqueued, |bitmap| {
|
self.update_status(wtxn, Status::Enqueued, |bitmap| {
|
||||||
bitmap.insert(task.uid);
|
bitmap.insert(task.uid);
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
|
self.update_kind(wtxn, task.kind.as_kind(), |bitmap| {
|
||||||
bitmap.insert(task.uid);
|
bitmap.insert(task.uid);
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)?;
|
utils::insert_task_datetime(wtxn, self.enqueued_at, task.enqueued_at, task.uid)
|
||||||
|
|
||||||
if let Err(e) = wtxn.commit() {
|
|
||||||
self.delete_persisted_task_data(&task)?;
|
|
||||||
return Err(e.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the registered task is a task cancelation
|
|
||||||
// we inform the processing tasks to stop (if necessary).
|
|
||||||
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
|
|
||||||
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
|
|
||||||
if self.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks_to_cancel)
|
|
||||||
{
|
|
||||||
self.must_stop_processing.must_stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// notify the scheduler loop to execute a new tick
|
|
||||||
self.wake_up.signal();
|
|
||||||
|
|
||||||
Ok(task)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a new task coming from a dump in the scheduler.
|
/// Register a new task coming from a dump in the scheduler.
|
||||||
@ -1046,7 +1129,7 @@ impl IndexScheduler {
|
|||||||
/// 6. Reset the in-memory list of processed tasks.
|
/// 6. Reset the in-memory list of processed tasks.
|
||||||
///
|
///
|
||||||
/// Returns the number of processed tasks.
|
/// Returns the number of processed tasks.
|
||||||
fn tick(&self) -> Result<TickOutcome> {
|
async fn tick(&self) -> Result<TickOutcome> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
{
|
{
|
||||||
*self.run_loop_iteration.write().unwrap() += 1;
|
*self.run_loop_iteration.write().unwrap() += 1;
|
||||||
@ -1055,7 +1138,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
puffin::GlobalProfiler::lock().new_frame();
|
puffin::GlobalProfiler::lock().new_frame();
|
||||||
|
|
||||||
self.cleanup_task_queue()?;
|
self.cleanup_task_queue().await?;
|
||||||
|
|
||||||
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
||||||
let batch =
|
let batch =
|
||||||
@ -1194,7 +1277,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Register a task to cleanup the task queue if needed
|
/// Register a task to cleanup the task queue if needed
|
||||||
fn cleanup_task_queue(&self) -> Result<()> {
|
async fn cleanup_task_queue(&self) -> Result<()> {
|
||||||
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
||||||
|
|
||||||
let nb_tasks = self.all_task_ids(&rtxn)?.len();
|
let nb_tasks = self.all_task_ids(&rtxn)?.len();
|
||||||
@ -1237,7 +1320,8 @@ impl IndexScheduler {
|
|||||||
delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?,
|
delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?,
|
||||||
),
|
),
|
||||||
tasks: to_delete,
|
tasks: to_delete,
|
||||||
})?;
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -110,7 +110,7 @@ impl AuthController {
|
|||||||
dbg!(controller_clone.store.delete_api_key(uuid).unwrap());
|
dbg!(controller_clone.store.delete_api_key(uuid).unwrap());
|
||||||
}
|
}
|
||||||
zk::EventType::NodeCreated | zk::EventType::NodeDataChanged => {
|
zk::EventType::NodeCreated | zk::EventType::NodeDataChanged => {
|
||||||
let (key, stat) = zk.get_data(&path).await.unwrap();
|
let (key, _stat) = zk.get_data(&path).await.unwrap();
|
||||||
let key: Key = serde_json::from_slice(&key).unwrap();
|
let key: Key = serde_json::from_slice(&key).unwrap();
|
||||||
log::info!("The key {} has been deleted", key.uid);
|
log::info!("The key {} has been deleted", key.uid);
|
||||||
|
|
||||||
|
@ -203,15 +203,16 @@ pub async fn setup_meilisearch(
|
|||||||
if let ScheduleSnapshot::Enabled(snapshot_delay) = opt.schedule_snapshot {
|
if let ScheduleSnapshot::Enabled(snapshot_delay) = opt.schedule_snapshot {
|
||||||
let snapshot_delay = Duration::from_secs(snapshot_delay);
|
let snapshot_delay = Duration::from_secs(snapshot_delay);
|
||||||
let index_scheduler = index_scheduler.clone();
|
let index_scheduler = index_scheduler.clone();
|
||||||
thread::Builder::new()
|
tokio::task::spawn(async move {
|
||||||
.name(String::from("register-snapshot-tasks"))
|
loop {
|
||||||
.spawn(move || loop {
|
|
||||||
thread::sleep(snapshot_delay);
|
thread::sleep(snapshot_delay);
|
||||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation).await {
|
||||||
error!("Error while registering snapshot: {}", e);
|
error!("Error while registering snapshot: {}", e);
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.unwrap();
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((index_scheduler, auth_controller))
|
Ok((index_scheduler, auth_controller))
|
||||||
@ -225,31 +226,32 @@ async fn open_or_create_database_unchecked(
|
|||||||
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
||||||
// we don't want to create anything in the data.ms yet, thus we
|
// we don't want to create anything in the data.ms yet, thus we
|
||||||
// wrap our two builders in a closure that'll be executed later.
|
// wrap our two builders in a closure that'll be executed later.
|
||||||
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zk);
|
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key, zk.clone());
|
||||||
let instance_features = opt.to_instance_features();
|
let instance_features = opt.to_instance_features();
|
||||||
let index_scheduler_builder = || -> anyhow::Result<_> {
|
let index_scheduler = IndexScheduler::new(IndexSchedulerOptions {
|
||||||
Ok(IndexScheduler::new(IndexSchedulerOptions {
|
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
|
||||||
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
|
auth_path: opt.db_path.join("auth"),
|
||||||
auth_path: opt.db_path.join("auth"),
|
tasks_path: opt.db_path.join("tasks"),
|
||||||
tasks_path: opt.db_path.join("tasks"),
|
update_file_path: opt.db_path.join("update_files"),
|
||||||
update_file_path: opt.db_path.join("update_files"),
|
indexes_path: opt.db_path.join("indexes"),
|
||||||
indexes_path: opt.db_path.join("indexes"),
|
snapshots_path: opt.snapshot_dir.clone(),
|
||||||
snapshots_path: opt.snapshot_dir.clone(),
|
dumps_path: opt.dump_dir.clone(),
|
||||||
dumps_path: opt.dump_dir.clone(),
|
task_db_size: opt.max_task_db_size.get_bytes() as usize,
|
||||||
task_db_size: opt.max_task_db_size.get_bytes() as usize,
|
index_base_map_size: opt.max_index_size.get_bytes() as usize,
|
||||||
index_base_map_size: opt.max_index_size.get_bytes() as usize,
|
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
||||||
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
indexer_config: (&opt.indexer_options).try_into()?,
|
||||||
indexer_config: (&opt.indexer_options).try_into()?,
|
autobatching_enabled: true,
|
||||||
autobatching_enabled: true,
|
max_number_of_tasks: 1_000_000,
|
||||||
max_number_of_tasks: 1_000_000,
|
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
|
||||||
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
|
index_count: DEFAULT_INDEX_COUNT,
|
||||||
index_count: DEFAULT_INDEX_COUNT,
|
instance_features,
|
||||||
instance_features,
|
zk: zk.clone(),
|
||||||
})?)
|
})
|
||||||
};
|
.await
|
||||||
|
.map_err(anyhow::Error::from);
|
||||||
|
|
||||||
match (
|
match (
|
||||||
index_scheduler_builder(),
|
index_scheduler,
|
||||||
auth_controller.await.map_err(anyhow::Error::from),
|
auth_controller.await.map_err(anyhow::Error::from),
|
||||||
create_version_file(&opt.db_path).map_err(anyhow::Error::from),
|
create_version_file(&opt.db_path).map_err(anyhow::Error::from),
|
||||||
) {
|
) {
|
||||||
|
@ -29,8 +29,7 @@ pub async fn create_dump(
|
|||||||
keys: auth_controller.list_keys()?,
|
keys: auth_controller.list_keys()?,
|
||||||
instance_uid: analytics.instance_uid().cloned(),
|
instance_uid: analytics.instance_uid().cloned(),
|
||||||
};
|
};
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -129,8 +129,7 @@ pub async fn delete_document(
|
|||||||
index_uid: index_uid.to_string(),
|
index_uid: index_uid.to_string(),
|
||||||
documents_ids: vec![document_id],
|
documents_ids: vec![document_id],
|
||||||
};
|
};
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
}
|
}
|
||||||
@ -445,7 +444,7 @@ async fn document_addition(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let scheduler = index_scheduler.clone();
|
let scheduler = index_scheduler.clone();
|
||||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
|
let task = match scheduler.register(task).await {
|
||||||
Ok(task) => task,
|
Ok(task) => task,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
index_scheduler.delete_update_file(uuid)?;
|
index_scheduler.delete_update_file(uuid)?;
|
||||||
@ -476,8 +475,7 @@ pub async fn delete_documents_batch(
|
|||||||
|
|
||||||
let task =
|
let task =
|
||||||
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -512,8 +510,7 @@ pub async fn delete_documents_by_filter(
|
|||||||
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
||||||
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
||||||
|
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -529,8 +526,7 @@ pub async fn clear_all_documents(
|
|||||||
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
|
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
|
||||||
|
|
||||||
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -135,8 +135,7 @@ pub async fn create_index(
|
|||||||
);
|
);
|
||||||
|
|
||||||
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
|
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
|
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
} else {
|
} else {
|
||||||
@ -203,8 +202,7 @@ pub async fn update_index(
|
|||||||
primary_key: body.primary_key,
|
primary_key: body.primary_key,
|
||||||
};
|
};
|
||||||
|
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -216,8 +214,7 @@ pub async fn delete_index(
|
|||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
|
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
|
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
}
|
}
|
||||||
|
@ -55,10 +55,7 @@ macro_rules! make_setting_route {
|
|||||||
is_deletion: true,
|
is_deletion: true,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
|
||||||
.await??
|
|
||||||
.into();
|
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -97,10 +94,7 @@ macro_rules! make_setting_route {
|
|||||||
is_deletion: false,
|
is_deletion: false,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
|
||||||
.await??
|
|
||||||
.into();
|
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -586,8 +580,7 @@ pub async fn update_all(
|
|||||||
is_deletion: false,
|
is_deletion: false,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -622,8 +615,7 @@ pub async fn delete_all(
|
|||||||
is_deletion: true,
|
is_deletion: true,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView = index_scheduler.register(task).await?.into();
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
|
||||||
|
|
||||||
debug!("returns: {:?}", task);
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -61,7 +61,7 @@ pub async fn swap_indexes(
|
|||||||
|
|
||||||
let task = KindWithContent::IndexSwap { swaps };
|
let task = KindWithContent::IndexSwap { swaps };
|
||||||
|
|
||||||
let task = index_scheduler.register(task)?;
|
let task = index_scheduler.register(task).await?;
|
||||||
let task: SummarizedTaskView = task.into();
|
let task: SummarizedTaskView = task.into();
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
}
|
}
|
||||||
|
@ -333,7 +333,7 @@ async fn cancel_tasks(
|
|||||||
let task_cancelation =
|
let task_cancelation =
|
||||||
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
|
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
|
||||||
|
|
||||||
let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??;
|
let task = index_scheduler.register(task_cancelation).await?;
|
||||||
let task: SummarizedTaskView = task.into();
|
let task: SummarizedTaskView = task.into();
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(task))
|
Ok(HttpResponse::Ok().json(task))
|
||||||
@ -378,7 +378,7 @@ async fn delete_tasks(
|
|||||||
let task_deletion =
|
let task_deletion =
|
||||||
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
|
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
|
||||||
|
|
||||||
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??;
|
let task = index_scheduler.register(task_deletion).await?;
|
||||||
let task: SummarizedTaskView = task.into();
|
let task: SummarizedTaskView = task.into();
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(task))
|
Ok(HttpResponse::Ok().json(task))
|
||||||
|
Loading…
Reference in New Issue
Block a user