mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
WIP moving to the sync zookeeper API
This commit is contained in:
parent
854745c670
commit
0c7d7c68bc
17 changed files with 653 additions and 750 deletions
|
@ -32,7 +32,7 @@ thiserror = "1.0.40"
|
|||
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
|
||||
uuid = { version = "1.3.1", features = ["serde", "v4"] }
|
||||
tokio = { version = "1.27.0", features = ["full"] }
|
||||
zookeeper-client = "0.5.0"
|
||||
zookeeper = "0.8.0"
|
||||
|
||||
[dev-dependencies]
|
||||
big_s = "1.0.2"
|
||||
|
|
|
@ -58,7 +58,10 @@ use time::format_description::well_known::Rfc3339;
|
|||
use time::OffsetDateTime;
|
||||
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
|
||||
use uuid::Uuid;
|
||||
use zookeeper_client as zk;
|
||||
use zookeeper::recipes::leader::LeaderLatch;
|
||||
use zookeeper::{
|
||||
Acl, AddWatchMode, CreateMode, WatchedEvent, WatchedEventType, ZkError, ZooKeeper,
|
||||
};
|
||||
|
||||
use crate::index_mapper::IndexMapper;
|
||||
use crate::utils::{check_index_swap_validity, clamp_to_page_size};
|
||||
|
@ -234,7 +237,6 @@ pub enum Breakpoint {
|
|||
InsideProcessBatch,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IndexSchedulerOptions {
|
||||
/// The path to the version file of Meilisearch.
|
||||
pub version_file_path: PathBuf,
|
||||
|
@ -271,7 +273,7 @@ pub struct IndexSchedulerOptions {
|
|||
/// The experimental features enabled for this instance.
|
||||
pub instance_features: InstanceTogglableFeatures,
|
||||
/// zookeeper client
|
||||
pub zk: Option<zk::Client>,
|
||||
pub zookeeper: Option<Arc<ZooKeeper>>,
|
||||
}
|
||||
|
||||
/// Structure which holds meilisearch's indexes and schedules the tasks
|
||||
|
@ -341,7 +343,7 @@ pub struct IndexScheduler {
|
|||
pub(crate) version_file_path: PathBuf,
|
||||
|
||||
/// The URL to the ZooKeeper cluster
|
||||
pub(crate) zk: Option<zk::Client>,
|
||||
pub(crate) zookeeper: Option<Arc<ZooKeeper>>,
|
||||
|
||||
// ================= test
|
||||
// The next entry is dedicated to the tests.
|
||||
|
@ -384,7 +386,7 @@ impl IndexScheduler {
|
|||
snapshots_path: self.snapshots_path.clone(),
|
||||
dumps_path: self.dumps_path.clone(),
|
||||
auth_path: self.auth_path.clone(),
|
||||
zk: self.zk.clone(),
|
||||
zookeeper: self.zookeeper.clone(),
|
||||
version_file_path: self.version_file_path.clone(),
|
||||
#[cfg(test)]
|
||||
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
||||
|
@ -399,7 +401,7 @@ impl IndexScheduler {
|
|||
|
||||
impl IndexScheduler {
|
||||
/// Create an index scheduler and start its run loop.
|
||||
pub async fn new(
|
||||
pub fn new(
|
||||
options: IndexSchedulerOptions,
|
||||
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
|
||||
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
|
||||
|
@ -481,7 +483,7 @@ impl IndexScheduler {
|
|||
snapshots_path: options.snapshots_path,
|
||||
auth_path: options.auth_path,
|
||||
version_file_path: options.version_file_path,
|
||||
zk: options.zk,
|
||||
zookeeper: options.zookeeper,
|
||||
#[cfg(test)]
|
||||
test_breakpoint_sdr,
|
||||
#[cfg(test)]
|
||||
|
@ -492,19 +494,30 @@ impl IndexScheduler {
|
|||
};
|
||||
|
||||
// initialize the directories we need to process batches.
|
||||
if let Some(ref zk) = this.zk {
|
||||
let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
|
||||
match zk.create("/election", &[], &options).await {
|
||||
Ok(_) | Err(zk::Error::NodeExists) => (),
|
||||
if let Some(zookeeper) = &this.zookeeper {
|
||||
match zookeeper.create(
|
||||
"/election",
|
||||
vec![],
|
||||
Acl::open_unsafe().clone(),
|
||||
CreateMode::Persistent,
|
||||
) {
|
||||
Ok(_) | Err(ZkError::NodeExists) => (),
|
||||
Err(e) => panic!("{e}"),
|
||||
}
|
||||
|
||||
match zk.create("/snapshots", &[], &options).await {
|
||||
Ok(_) | Err(zk::Error::NodeExists) => (),
|
||||
match zookeeper.create(
|
||||
"/snapshots",
|
||||
vec![],
|
||||
Acl::open_unsafe().clone(),
|
||||
CreateMode::Persistent,
|
||||
) {
|
||||
Ok(_) | Err(ZkError::NodeExists) => (),
|
||||
Err(e) => panic!("{e}"),
|
||||
}
|
||||
}
|
||||
this.run().await;
|
||||
|
||||
this.run();
|
||||
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
|
@ -592,315 +605,119 @@ impl IndexScheduler {
|
|||
///
|
||||
/// This function will execute in a different thread and must be called
|
||||
/// only once per index scheduler.
|
||||
async fn run(&self) {
|
||||
let run = self.private_clone();
|
||||
let zk = self.zk.clone();
|
||||
let mut self_node_id = zk::CreateSequence(0);
|
||||
tokio::task::spawn(async move {
|
||||
#[cfg(test)]
|
||||
run.breakpoint(Breakpoint::Init);
|
||||
fn run(&self) {
|
||||
#[cfg(test)]
|
||||
run.breakpoint(Breakpoint::Init);
|
||||
|
||||
if let Some(zookeeper) = self.zookeeper.clone() {
|
||||
let id = Uuid::new_v4().to_string();
|
||||
let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string());
|
||||
latch.start().unwrap();
|
||||
|
||||
// Join the potential leaders list.
|
||||
// The lowest in the list is the leader. And if we're not the leader
|
||||
// we watch the node right before us to be notified if he dies.
|
||||
// See https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection
|
||||
let mut watchers = if let Some(ref zk) = zk {
|
||||
let options = zk::CreateMode::EphemeralSequential.with_acls(zk::Acls::anyone_all());
|
||||
let (_stat, id) = zk.create("/election/node-", &[], &options).await.unwrap();
|
||||
self_node_id = id;
|
||||
let previous_path = {
|
||||
let mut list = zk.list_children("/election").await.unwrap();
|
||||
list.sort();
|
||||
let latchc = latch.clone();
|
||||
let this = self.private_clone();
|
||||
zookeeper
|
||||
.add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| {
|
||||
if !latchc.has_leadership() {
|
||||
let WatchedEvent { event_type, path, keeper_state: _ } = event;
|
||||
match event_type {
|
||||
WatchedEventType::NodeCreated => {
|
||||
let path = path.unwrap();
|
||||
log::info!("The snapshot {} is in preparation", path);
|
||||
}
|
||||
WatchedEventType::NodeDataChanged => {
|
||||
let path = path.unwrap();
|
||||
log::info!("Importing snapshot {}", path);
|
||||
let snapshot_id =
|
||||
path.strip_prefix("/snapshots/snapshot-").unwrap();
|
||||
let snapshot_dir = PathBuf::from(format!(
|
||||
"{}/zk-snapshots/{}",
|
||||
env!("HOME"),
|
||||
snapshot_id
|
||||
));
|
||||
|
||||
let self_node_path = format!("node-{}", self_node_id);
|
||||
let previous_path =
|
||||
list.into_iter().take_while(|path| path < &self_node_path).last();
|
||||
previous_path.map(|path| format!("/election/{}", path))
|
||||
};
|
||||
// 1. TODO: Ensure the snapshot version file is the same as our version.
|
||||
|
||||
if let Some(previous_path) = previous_path {
|
||||
log::warn!("I am the follower {}", self_node_id);
|
||||
Some((
|
||||
zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(),
|
||||
zk.watch("/snapshots", zk::AddWatchMode::PersistentRecursive)
|
||||
.await
|
||||
.unwrap(),
|
||||
))
|
||||
} else {
|
||||
// if there was no node before ourselves, then we're the leader.
|
||||
log::warn!("I'm the leader");
|
||||
None
|
||||
}
|
||||
} else {
|
||||
log::warn!("I don't have any ZK cluster");
|
||||
None
|
||||
};
|
||||
// 2. Download all the databases
|
||||
let tasks_file =
|
||||
tempfile::NamedTempFile::new_in(this.env.path()).unwrap();
|
||||
|
||||
loop {
|
||||
match watchers.as_mut() {
|
||||
Some((leader_watcher, snapshot_watcher)) => {
|
||||
// We wait for a new batch processed by the leader OR a disconnection from the leader.
|
||||
tokio::select! {
|
||||
zk::WatchedEvent { event_type, session_state, .. } = leader_watcher.changed() => match event_type {
|
||||
zk::EventType::Session => panic!("Session error {:?}", session_state),
|
||||
zk::EventType::NodeDeleted => {
|
||||
// The node behind us has been disconnected,
|
||||
// am I the leader or is there someone before me.
|
||||
let zk = zk.as_ref().unwrap();
|
||||
let previous_path = {
|
||||
let mut list = zk.list_children("/election").await.unwrap();
|
||||
list.sort();
|
||||
log::info!("Downloading the index scheduler database.");
|
||||
let tasks_snapshot = snapshot_dir.join("tasks.mdb");
|
||||
std::fs::copy(tasks_snapshot, tasks_file).unwrap();
|
||||
|
||||
let self_node_path = format!("node-{}", self_node_id);
|
||||
let previous_path =
|
||||
list.into_iter().take_while(|path| path < &self_node_path).last();
|
||||
previous_path.map(|path| format!("/election/{}", path))
|
||||
};
|
||||
log::info!("Downloading the indexes databases");
|
||||
let indexes_files =
|
||||
tempfile::TempDir::new_in(&this.index_mapper.base_path)
|
||||
.unwrap();
|
||||
let mut indexes = Vec::new();
|
||||
|
||||
let (leader_watcher, snapshot_watcher) = watchers.take().unwrap();
|
||||
leader_watcher.remove().await.unwrap();
|
||||
watchers = if let Some(previous_path) = previous_path {
|
||||
log::warn!("I stay a follower {}", self_node_id);
|
||||
Some((
|
||||
zk.watch(&previous_path, zk::AddWatchMode::Persistent).await.unwrap(),
|
||||
snapshot_watcher,
|
||||
))
|
||||
} else {
|
||||
log::warn!("I'm the new leader");
|
||||
snapshot_watcher.remove().await.unwrap();
|
||||
None
|
||||
}
|
||||
let dst = snapshot_dir.join("indexes");
|
||||
for result in std::fs::read_dir(&dst).unwrap() {
|
||||
let entry = result.unwrap();
|
||||
let uuid =
|
||||
entry.file_name().as_os_str().to_str().unwrap().to_string();
|
||||
log::info!("\tDownloading the index {}", uuid.to_string());
|
||||
std::fs::copy(
|
||||
dst.join(&uuid),
|
||||
indexes_files.path().join(&uuid),
|
||||
)
|
||||
.unwrap();
|
||||
indexes.push(uuid);
|
||||
}
|
||||
_ => (),
|
||||
},
|
||||
zk::WatchedEvent { event_type, session_state, path } = snapshot_watcher.changed() => match event_type {
|
||||
zk::EventType::Session => panic!("Session error {:?}", session_state),
|
||||
zk::EventType::NodeCreated => {
|
||||
log::info!("The snapshot {} is in preparation", path);
|
||||
}
|
||||
zk::EventType::NodeDataChanged => {
|
||||
log::info!("Importing snapshot {}", path);
|
||||
|
||||
let snapshot_id = path.strip_prefix("/snapshots/snapshot-").unwrap();
|
||||
let snapshot_dir =
|
||||
PathBuf::from(format!("{}/zk-snapshots/{}", env!("HOME"), snapshot_id));
|
||||
// 3. Lock the index-mapper and close all the env
|
||||
// TODO: continue here
|
||||
|
||||
// 1. TODO: Ensure the snapshot version file is the same as our version.
|
||||
// run.env.close();
|
||||
|
||||
// 2. Download all the databases
|
||||
let tasks_file = tempfile::NamedTempFile::new_in(run.env.path()).unwrap();
|
||||
// 4. Move all the databases
|
||||
|
||||
log::info!("Downloading the index scheduler database.");
|
||||
let tasks_snapshot =
|
||||
snapshot_dir.join("tasks.mdb");
|
||||
std::fs::copy(tasks_snapshot, tasks_file).unwrap();
|
||||
// 5. Unlock the index-mapper
|
||||
|
||||
// 2. Download and import the index-scheduler database
|
||||
|
||||
log::info!("Downloading the indexes databases");
|
||||
let indexes_files = tempfile::TempDir::new_in(&run.index_mapper.base_path).unwrap();
|
||||
let mut indexes = Vec::new();
|
||||
|
||||
let dst = snapshot_dir.join("indexes");
|
||||
let mut indexes_snapshot = tokio::fs::read_dir(&dst).await.unwrap();
|
||||
while let Some(file) = indexes_snapshot.next_entry().await.unwrap() {
|
||||
let uuid = file.file_name().as_os_str().to_str().unwrap().to_string();
|
||||
log::info!("\tDownloading the index {}", uuid.to_string());
|
||||
std::fs::copy(dst.join(&uuid), indexes_files.path().join(&uuid)).unwrap();
|
||||
indexes.push(uuid);
|
||||
}
|
||||
|
||||
// 3. Lock the index-mapper and close all the env
|
||||
// TODO: continue here
|
||||
|
||||
|
||||
|
||||
// run.env.close();
|
||||
|
||||
// 4. Move all the databases
|
||||
|
||||
// 5. Unlock the index-mapper
|
||||
|
||||
// 2. Download and import the index-scheduler database
|
||||
|
||||
// 3. Snapshot every indexes
|
||||
}
|
||||
_ => (),
|
||||
},
|
||||
else => break,
|
||||
// 3. Snapshot every indexes
|
||||
}
|
||||
otherwise => panic!("{otherwise:?}"),
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// we're either a leader or not running in a cluster,
|
||||
// either way we should wait until we receive a task.
|
||||
let wake_up = run.wake_up.clone();
|
||||
let _ = tokio::task::spawn_blocking(move || wake_up.wait()).await;
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
match run.tick().await {
|
||||
Ok(TickOutcome::TickAgain(n)) => {
|
||||
// We must tick again.
|
||||
run.wake_up.signal();
|
||||
|
||||
// if we're in a cluster that means we're the leader
|
||||
// and should share a snapshot of what we've done.
|
||||
if let Some(ref zk) = run.zk {
|
||||
// if nothing was processed we have nothing to do.
|
||||
if n == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let options = zk::CreateMode::EphemeralSequential
|
||||
.with_acls(zk::Acls::anyone_all());
|
||||
let (_stat, snapshot_id) = zk
|
||||
.create("/snapshots/snapshot-", &[], &options)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let zk_snapshots = format!("{}/zk-snapshots", env!("HOME"));
|
||||
tokio::fs::create_dir_all(&zk_snapshots).await.unwrap();
|
||||
let snapshot_dir =
|
||||
PathBuf::from(format!("{zk_snapshots}/{snapshot_id}"));
|
||||
tokio::fs::create_dir(&snapshot_dir).await.unwrap();
|
||||
|
||||
// 1. Snapshot the version file.
|
||||
let dst =
|
||||
snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME);
|
||||
tokio::fs::copy(&run.version_file_path, dst).await.unwrap();
|
||||
|
||||
// 2. Snapshot the index-scheduler LMDB env
|
||||
let dst = snapshot_dir.join("tasks");
|
||||
tokio::fs::create_dir_all(&dst).await.unwrap();
|
||||
|
||||
log::info!("Snapshotting the tasks");
|
||||
let env = run.env.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
env.copy_to_path(
|
||||
dst.join("tasks.mdb"),
|
||||
heed::CompactionOption::Enabled,
|
||||
)
|
||||
.unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 3. Snapshot every indexes
|
||||
log::info!("Snapshotting the indexes");
|
||||
let dst = snapshot_dir.join("indexes");
|
||||
tokio::fs::create_dir_all(&dst).await.unwrap();
|
||||
|
||||
let this = run.private_clone();
|
||||
let indexes = tokio::task::spawn_blocking(move || {
|
||||
let rtxn = this.env.read_txn().unwrap();
|
||||
this.index_mapper
|
||||
.index_mapping
|
||||
.iter(&rtxn)
|
||||
.unwrap()
|
||||
.map(|ret| ret.unwrap())
|
||||
.map(|(name, uuid)| (name.to_string(), uuid))
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
for (name, uuid) in indexes {
|
||||
log::info!(" Snapshotting index {name}");
|
||||
let this = run.private_clone();
|
||||
let dst = dst.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let rtxn = this.env.read_txn().unwrap();
|
||||
let index =
|
||||
this.index_mapper.index(&rtxn, &name).unwrap();
|
||||
index
|
||||
.copy_to_path(
|
||||
dst.join(format!("{uuid}.mdb")),
|
||||
heed::CompactionOption::Enabled,
|
||||
)
|
||||
.unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// we must notify everyone that we dropped a new snapshot on the s3
|
||||
let _stat = zk
|
||||
.set_data(
|
||||
&format!("/snapshots/snapshot-{}", snapshot_id),
|
||||
&[],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
log::info!(
|
||||
"Notified everyone about the new snapshot {snapshot_id}"
|
||||
);
|
||||
|
||||
// We can now delete all the tasks that has been processed
|
||||
let processed = run
|
||||
.processing_tasks
|
||||
.read()
|
||||
.unwrap()
|
||||
.processed_previously()
|
||||
.clone(); // we don't want to hold the mutex
|
||||
log::info!("Deleting {} processed tasks", processed.len());
|
||||
for task in processed {
|
||||
let _ = zk // we don't want to crash if we can't delete an update file.
|
||||
.delete(
|
||||
&format!(
|
||||
"/tasks/task-{}",
|
||||
zk::CreateSequence(task as i32)
|
||||
),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
// TODO: Delete the update files associated with the deleted tasks
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(TickOutcome::WaitForSignal) => (),
|
||||
Err(e) => {
|
||||
log::error!("{}", e);
|
||||
// Wait one second when an irrecoverable error occurs.
|
||||
if !e.is_recoverable() {
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(ref zk) = &self.zk {
|
||||
let options = zk::CreateMode::Persistent.with_acls(zk::Acls::anyone_all());
|
||||
match zk.create("/tasks", &[], &options).await {
|
||||
match zookeeper.create(
|
||||
"/tasks",
|
||||
vec![],
|
||||
Acl::open_unsafe().clone(),
|
||||
CreateMode::Persistent,
|
||||
) {
|
||||
Ok(_) => (),
|
||||
Err(zk::Error::NodeExists) => {
|
||||
Err(ZkError::NodeExists) => {
|
||||
log::warn!("Tasks directory already exists, we're going to import all the tasks on the zk without altering the tasks already on disk.");
|
||||
|
||||
let children = zk
|
||||
.list_children("/tasks")
|
||||
.await
|
||||
.expect("Internal, the /tasks directory was deleted during execution.");
|
||||
let children = zookeeper
|
||||
.get_children("/tasks", false)
|
||||
.expect("Internal, the /tasks directory was deleted during execution."); // TODO change me
|
||||
|
||||
log::info!("Importing {} tasks", children.len());
|
||||
for path in children {
|
||||
log::info!(" Importing {}", path);
|
||||
match zk.get_data(&format!("/tasks/{}", &path)).await {
|
||||
match zookeeper.get_data(&format!("/tasks/{}", &path), false) {
|
||||
Ok((task, _stat)) => {
|
||||
if task.is_empty() {
|
||||
log::info!(" Task {} was empty, skipping.", path);
|
||||
continue;
|
||||
}
|
||||
let task = serde_json::from_slice(&task).unwrap();
|
||||
|
||||
let this = self.private_clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut wtxn = this.env.write_txn().unwrap();
|
||||
this.register_raw_task(&mut wtxn, &task).unwrap();
|
||||
} else {
|
||||
let task = serde_json::from_slice(&task).unwrap();
|
||||
let mut wtxn = self.env.write_txn().unwrap();
|
||||
self.register_raw_task(&mut wtxn, &task).unwrap();
|
||||
wtxn.commit().unwrap();
|
||||
// we received a new tasks, we must wake up
|
||||
this.wake_up.signal();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
self.wake_up.signal();
|
||||
}
|
||||
}
|
||||
Err(e) => panic!("{e}"),
|
||||
}
|
||||
|
@ -913,38 +730,154 @@ impl IndexScheduler {
|
|||
}
|
||||
|
||||
// TODO: fix unwrap by returning a clear error.
|
||||
let mut watcher =
|
||||
zk.watch("/tasks", zk::AddWatchMode::PersistentRecursive).await.unwrap();
|
||||
let this = self.private_clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let zk::WatchedEvent { event_type, session_state, path } =
|
||||
watcher.changed().await;
|
||||
zookeeper
|
||||
.add_watch("/tasks", AddWatchMode::PersistentRecursive, move |event| {
|
||||
let WatchedEvent { event_type, path, keeper_state: _ } = event;
|
||||
match event_type {
|
||||
zk::EventType::Session => panic!("Session error {:?}", session_state),
|
||||
// A task as been added
|
||||
zk::EventType::NodeDataChanged => {
|
||||
WatchedEventType::NodeDataChanged => {
|
||||
let path = path.unwrap();
|
||||
// Add raw task content in local DB
|
||||
log::info!("Received a new task from the cluster at {}", path);
|
||||
let (data, _stat) =
|
||||
this.zk.as_ref().unwrap().get_data(&path).await.unwrap();
|
||||
let task = serde_json::from_slice(&data).unwrap();
|
||||
this.zookeeper.as_ref().unwrap().get_data(&path, false).unwrap();
|
||||
let task = serde_json::from_slice(data.as_slice()).unwrap();
|
||||
let mut wtxn = this.env.write_txn().unwrap();
|
||||
this.register_raw_task(&mut wtxn, &task).unwrap();
|
||||
wtxn.commit().unwrap();
|
||||
}
|
||||
otherwise => panic!("{otherwise:?}"),
|
||||
}
|
||||
|
||||
this.wake_up.signal();
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let this = self.private_clone();
|
||||
std::thread::spawn(move || {
|
||||
loop {
|
||||
// we're either a leader or not running in a cluster,
|
||||
// either way we should wait until we receive a task.
|
||||
let wake_up = this.wake_up.clone();
|
||||
let _ = wake_up.wait();
|
||||
|
||||
match this.tick() {
|
||||
Ok(TickOutcome::TickAgain(n)) => {
|
||||
// We must tick again.
|
||||
this.wake_up.signal();
|
||||
|
||||
// if we're in a cluster that means we're the leader
|
||||
// and should share a snapshot of what we've done.
|
||||
if let Some(ref zookeeper) = this.zookeeper {
|
||||
// if nothing was processed we have nothing to do.
|
||||
if n == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let snapshot_id = zookeeper
|
||||
.create(
|
||||
"/snapshots/snapshot-",
|
||||
vec![],
|
||||
Acl::open_unsafe().clone(),
|
||||
CreateMode::PersistentSequential,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
dbg!(&snapshot_id);
|
||||
let zk_snapshots = format!("{}/zk-snapshots", env!("HOME"));
|
||||
std::fs::create_dir_all(&zk_snapshots).unwrap();
|
||||
let snapshot_dir =
|
||||
PathBuf::from(format!("{zk_snapshots}/{snapshot_id}"));
|
||||
std::fs::create_dir(&snapshot_dir).unwrap();
|
||||
|
||||
// 1. Snapshot the version file.
|
||||
let dst = snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME);
|
||||
std::fs::copy(&this.version_file_path, dst).unwrap();
|
||||
|
||||
// 2. Snapshot the index-scheduler LMDB env
|
||||
let dst = snapshot_dir.join("tasks");
|
||||
std::fs::create_dir_all(&dst).unwrap();
|
||||
|
||||
log::info!("Snapshotting the tasks");
|
||||
let env = this.env.clone();
|
||||
env.copy_to_path(
|
||||
dst.join("tasks.mdb"),
|
||||
heed::CompactionOption::Enabled,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// 3. Snapshot every indexes
|
||||
log::info!("Snapshotting the indexes");
|
||||
let dst = snapshot_dir.join("indexes");
|
||||
std::fs::create_dir_all(&dst).unwrap();
|
||||
|
||||
let this = this.private_clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let mut wtxn = this.env.write_txn().unwrap();
|
||||
this.register_raw_task(&mut wtxn, &task).unwrap();
|
||||
wtxn.commit().unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let rtxn = this.env.read_txn().unwrap();
|
||||
let indexes = this
|
||||
.index_mapper
|
||||
.index_mapping
|
||||
.iter(&rtxn)
|
||||
.unwrap()
|
||||
.map(|ret| ret.unwrap())
|
||||
.map(|(name, uuid)| (name.to_string(), uuid))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (name, uuid) in indexes {
|
||||
log::info!(" Snapshotting index {name}");
|
||||
let this = this.private_clone();
|
||||
let dst = dst.clone();
|
||||
let rtxn = this.env.read_txn().unwrap();
|
||||
let index = this.index_mapper.index(&rtxn, &name).unwrap();
|
||||
index
|
||||
.copy_to_path(
|
||||
dst.join(format!("{uuid}.mdb")),
|
||||
heed::CompactionOption::Enabled,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// we must notify everyone that we dropped a new snapshot on the s3
|
||||
let _stat = zookeeper.set_data(
|
||||
&format!("/snapshots/snapshot-{}", snapshot_id),
|
||||
vec![],
|
||||
None,
|
||||
);
|
||||
log::info!("Notified everyone about the new snapshot {snapshot_id}");
|
||||
|
||||
// We can now delete all the tasks that has been processed
|
||||
let processed = this
|
||||
.processing_tasks
|
||||
.read()
|
||||
.unwrap()
|
||||
.processed_previously()
|
||||
.clone(); // we don't want to hold the mutex
|
||||
log::info!("Deleting {} processed tasks", processed.len());
|
||||
for task in processed {
|
||||
let _ = zookeeper // we don't want to crash if we can't delete an update file.
|
||||
.delete(
|
||||
&format!(
|
||||
"/tasks/task-{:0>10}",
|
||||
task as i32
|
||||
),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
// TODO: Delete the update files associated with the deleted tasks
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(TickOutcome::WaitForSignal) => (),
|
||||
Err(e) => {
|
||||
log::error!("{}", e);
|
||||
// Wait one second when an irrecoverable error occurs.
|
||||
if !e.is_recoverable() {
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
this.wake_up.signal();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn indexer_config(&self) -> &IndexerConfig {
|
||||
|
@ -1279,14 +1212,17 @@ impl IndexScheduler {
|
|||
/// Register a new task in the scheduler.
|
||||
///
|
||||
/// If it fails and data was associated with the task, it tries to delete the associated data.
|
||||
pub async fn register(&self, kind: KindWithContent) -> Result<Task> {
|
||||
let id = match self.zk {
|
||||
Some(ref zk) => {
|
||||
// reserve uniq ID on zookeeper. And give it to the spawn blocking.
|
||||
let options =
|
||||
zk::CreateMode::PersistentSequential.with_acls(zk::Acls::anyone_all());
|
||||
match zk.create("/tasks/task-", &[], &options).await {
|
||||
Ok((_stats, id)) => Some(id),
|
||||
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
|
||||
let id = match &self.zookeeper {
|
||||
Some(zookeeper) => {
|
||||
// Reserve uniq ID on zookeeper. And give it to the spawn blocking.
|
||||
match zookeeper.create(
|
||||
"/tasks/task-",
|
||||
vec![],
|
||||
Acl::open_unsafe().clone(),
|
||||
CreateMode::PersistentSequential,
|
||||
) {
|
||||
Ok(path) => path.rsplit_once('-').map(|(_, id)| id.parse::<u32>().unwrap()),
|
||||
Err(e) => panic!("{e}"),
|
||||
}
|
||||
}
|
||||
|
@ -1294,80 +1230,69 @@ impl IndexScheduler {
|
|||
};
|
||||
|
||||
let this = self.private_clone();
|
||||
let task = tokio::task::spawn_blocking(move || {
|
||||
let mut wtxn = this.env.write_txn()?;
|
||||
let mut wtxn = this.env.write_txn()?;
|
||||
|
||||
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
|
||||
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
|
||||
&& (this.env.non_free_pages_size()? * 100) / this.env.map_size()? as u64 > 50
|
||||
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
|
||||
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
|
||||
&& (this.env.non_free_pages_size()? * 100) / this.env.map_size()? as u64 > 50
|
||||
{
|
||||
return Err(Error::NoSpaceLeftInTaskQueue);
|
||||
}
|
||||
|
||||
// Retrieve the id generated by zookeeper or generate a local id.
|
||||
let id = match id {
|
||||
Some(id) => id as u32,
|
||||
None => this.next_task_id(&wtxn)?,
|
||||
};
|
||||
|
||||
let mut task = Task {
|
||||
uid: id,
|
||||
enqueued_at: OffsetDateTime::now_utc(),
|
||||
started_at: None,
|
||||
finished_at: None,
|
||||
error: None,
|
||||
canceled_by: None,
|
||||
details: kind.default_details(),
|
||||
status: Status::Enqueued,
|
||||
kind: kind.clone(),
|
||||
};
|
||||
// For deletion and cancelation tasks, we want to make extra sure that they
|
||||
// don't attempt to delete/cancel tasks that are newer than themselves.
|
||||
filter_out_references_to_newer_tasks(&mut task);
|
||||
// If the register task is an index swap task, verify that it is well-formed
|
||||
// (that it does not contain duplicate indexes).
|
||||
check_index_swap_validity(&task)?;
|
||||
|
||||
this.register_raw_task(&mut wtxn, &task)?;
|
||||
|
||||
if let Err(e) = wtxn.commit() {
|
||||
this.delete_persisted_task_data(&task)?;
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
// If the registered task is a task cancelation
|
||||
// we inform the processing tasks to stop (if necessary).
|
||||
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
|
||||
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
|
||||
if this.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks_to_cancel)
|
||||
{
|
||||
return Err(Error::NoSpaceLeftInTaskQueue);
|
||||
this.must_stop_processing.must_stop();
|
||||
}
|
||||
}
|
||||
|
||||
// get id generated by zookeeper or generate a local id.
|
||||
let id = match id {
|
||||
Some(id) => id.0 as u32,
|
||||
None => this.next_task_id(&wtxn)?,
|
||||
};
|
||||
|
||||
let mut task = Task {
|
||||
uid: id,
|
||||
enqueued_at: OffsetDateTime::now_utc(),
|
||||
started_at: None,
|
||||
finished_at: None,
|
||||
error: None,
|
||||
canceled_by: None,
|
||||
details: kind.default_details(),
|
||||
status: Status::Enqueued,
|
||||
kind: kind.clone(),
|
||||
};
|
||||
// For deletion and cancelation tasks, we want to make extra sure that they
|
||||
// don't attempt to delete/cancel tasks that are newer than themselves.
|
||||
filter_out_references_to_newer_tasks(&mut task);
|
||||
// If the register task is an index swap task, verify that it is well-formed
|
||||
// (that it does not contain duplicate indexes).
|
||||
check_index_swap_validity(&task)?;
|
||||
|
||||
this.register_raw_task(&mut wtxn, &task)?;
|
||||
|
||||
if let Err(e) = wtxn.commit() {
|
||||
this.delete_persisted_task_data(&task)?;
|
||||
return Err(e.into());
|
||||
}
|
||||
|
||||
// If the registered task is a task cancelation
|
||||
// we inform the processing tasks to stop (if necessary).
|
||||
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
|
||||
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
|
||||
if this
|
||||
.processing_tasks
|
||||
.read()
|
||||
.unwrap()
|
||||
.must_cancel_processing_tasks(&tasks_to_cancel)
|
||||
{
|
||||
this.must_stop_processing.must_stop();
|
||||
}
|
||||
}
|
||||
|
||||
// notify the scheduler loop to execute a new tick
|
||||
this.wake_up.signal();
|
||||
|
||||
Ok(task)
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
// notify the scheduler loop to execute a new tick
|
||||
this.wake_up.signal();
|
||||
|
||||
// TODO: send task to ZK in raw json.
|
||||
if let Some(ref zk) = self.zk {
|
||||
let id = id.unwrap();
|
||||
if let Some(zookeeper) = &self.zookeeper {
|
||||
// TODO: ugly unwrap
|
||||
zk.set_data(
|
||||
&format!("/tasks/task-{}", id),
|
||||
&serde_json::to_vec_pretty(&task).unwrap(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
zookeeper
|
||||
.set_data(
|
||||
&format!("/tasks/task-{}", id),
|
||||
serde_json::to_vec_pretty(&task).unwrap(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Ok(task)
|
||||
|
@ -1449,7 +1374,7 @@ impl IndexScheduler {
|
|||
/// 6. Reset the in-memory list of processed tasks.
|
||||
///
|
||||
/// Returns the number of processed tasks.
|
||||
async fn tick(&self) -> Result<TickOutcome> {
|
||||
fn tick(&self) -> Result<TickOutcome> {
|
||||
#[cfg(test)]
|
||||
{
|
||||
*self.run_loop_iteration.write().unwrap() += 1;
|
||||
|
@ -1458,7 +1383,7 @@ impl IndexScheduler {
|
|||
|
||||
puffin::GlobalProfiler::lock().new_frame();
|
||||
|
||||
self.cleanup_task_queue().await?;
|
||||
self.cleanup_task_queue()?;
|
||||
|
||||
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
||||
let batch =
|
||||
|
@ -1597,7 +1522,7 @@ impl IndexScheduler {
|
|||
}
|
||||
|
||||
/// Register a task to cleanup the task queue if needed
|
||||
async fn cleanup_task_queue(&self) -> Result<()> {
|
||||
fn cleanup_task_queue(&self) -> Result<()> {
|
||||
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
||||
|
||||
let nb_tasks = self.all_task_ids(&rtxn)?.len();
|
||||
|
@ -1640,8 +1565,7 @@ impl IndexScheduler {
|
|||
delete_before.format(&Rfc3339).map_err(|_| Error::CorruptedTaskQueue)?,
|
||||
),
|
||||
tasks: to_delete,
|
||||
})
|
||||
.await?;
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue