mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
starts using s3
This commit is contained in:
parent
41697c4d65
commit
5b89276fcc
6 changed files with 224 additions and 61 deletions
|
@ -34,6 +34,7 @@ uuid = { version = "1.3.1", features = ["serde", "v4"] }
|
|||
tokio = { version = "1.27.0", features = ["full"] }
|
||||
zookeeper = "0.8.0"
|
||||
parking_lot = "0.12.1"
|
||||
rust-s3 = { version = "0.33.0", default-features = false, features = ["sync-rustls-tls"] }
|
||||
|
||||
[dev-dependencies]
|
||||
big_s = "1.0.2"
|
||||
|
|
|
@ -56,7 +56,9 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap
|
|||
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
||||
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
|
||||
use roaring::RoaringBitmap;
|
||||
use s3::Bucket;
|
||||
use synchronoise::SignalEvent;
|
||||
use tempfile::NamedTempFile;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use time::OffsetDateTime;
|
||||
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
|
||||
|
@ -277,6 +279,8 @@ pub struct IndexSchedulerOptions {
|
|||
pub instance_features: InstanceTogglableFeatures,
|
||||
/// zookeeper client
|
||||
pub zookeeper: Option<Arc<ZooKeeper>>,
|
||||
/// S3 bucket
|
||||
pub s3: Option<Bucket>,
|
||||
}
|
||||
|
||||
/// Structure which holds meilisearch's indexes and schedules the tasks
|
||||
|
@ -305,6 +309,7 @@ impl IndexScheduler {
|
|||
|
||||
// initialize the directories we need to process batches.
|
||||
if let Some(zookeeper) = &inner.zookeeper {
|
||||
// Create all the required directories in zookeeper
|
||||
match zookeeper.create(
|
||||
"/election",
|
||||
vec![],
|
||||
|
@ -360,9 +365,6 @@ impl IndexScheduler {
|
|||
|
||||
let latch = match self.zookeeper {
|
||||
Some(ref zookeeper) => {
|
||||
let zk_tasks = format!("{}/zk-tasks", env!("HOME"));
|
||||
std::fs::create_dir_all(&zk_tasks).unwrap();
|
||||
|
||||
let id = Uuid::new_v4().to_string();
|
||||
let latch = LeaderLatch::new(zookeeper.clone(), id, "/election".to_string());
|
||||
let wake_up = self.wake_up.clone();
|
||||
|
@ -401,13 +403,10 @@ impl IndexScheduler {
|
|||
log::info!("Importing snapshot {}", path);
|
||||
let snapshot_id =
|
||||
path.strip_prefix("/snapshots/snapshot-").unwrap();
|
||||
let snapshot_dir = PathBuf::from(format!(
|
||||
"{}/zk-snapshots/{}",
|
||||
env!("HOME"),
|
||||
snapshot_id
|
||||
));
|
||||
let snapshot_dir = format!("snapshots.{}", snapshot_id);
|
||||
|
||||
let inner = this.inner();
|
||||
let s3 = inner.options.s3.as_ref().unwrap();
|
||||
|
||||
// 1. TODO: Ensure the snapshot version file is the same as our version.
|
||||
|
||||
|
@ -418,19 +417,19 @@ impl IndexScheduler {
|
|||
path
|
||||
};
|
||||
|
||||
let tasks_file =
|
||||
let mut tasks_file =
|
||||
tempfile::NamedTempFile::new_in(inner.env.path()).unwrap();
|
||||
|
||||
log::info!("Downloading the index scheduler database.");
|
||||
let tasks_snapshot = snapshot_dir.join("tasks.mdb");
|
||||
std::fs::copy(&tasks_snapshot, &tasks_file).unwrap();
|
||||
let tasks_snapshot = format!("{snapshot_dir}/tasks.mdb");
|
||||
s3.get_object_to_writer(tasks_snapshot, &mut tasks_file)
|
||||
.unwrap();
|
||||
|
||||
log::info!("Downloading the indexes databases");
|
||||
let indexes_files =
|
||||
tempfile::TempDir::new_in(&base_path).unwrap();
|
||||
|
||||
let mut indexes = Vec::new();
|
||||
let src = snapshot_dir.join("indexes");
|
||||
let src = format!("{snapshot_dir}.indexes");
|
||||
for result in std::fs::read_dir(&src).unwrap() {
|
||||
let entry = result.unwrap();
|
||||
let uuid = entry
|
||||
|
@ -444,16 +443,17 @@ impl IndexScheduler {
|
|||
indexes_files.path().join(&uuid).with_extension(""),
|
||||
)
|
||||
.unwrap();
|
||||
std::fs::copy(
|
||||
src.join(&uuid).with_extension("mdb"),
|
||||
indexes_files
|
||||
.path()
|
||||
.join(&uuid)
|
||||
.with_extension("")
|
||||
.join("data.mdb"),
|
||||
let path = indexes_files
|
||||
.path()
|
||||
.join(&uuid)
|
||||
.with_extension("")
|
||||
.join("data.mdb");
|
||||
let mut file = File::create(path).unwrap();
|
||||
s3.get_object_to_writer(
|
||||
format!("{src}.{uuid}.mdb"),
|
||||
&mut file,
|
||||
)
|
||||
.unwrap();
|
||||
indexes.push(uuid);
|
||||
}
|
||||
|
||||
// 3. Lock the index-mapper and close all the env
|
||||
|
@ -529,11 +529,11 @@ impl IndexScheduler {
|
|||
.rsplit_once('-')
|
||||
.map(|(_, id)| id.parse::<u32>().unwrap())
|
||||
.unwrap();
|
||||
let task_path = Path::new(std::env!("HOME"))
|
||||
.join("zk-tasks")
|
||||
.join(format!("{:0>10}", id));
|
||||
let file = File::open(task_path).unwrap();
|
||||
let task = serde_json::from_reader(file).unwrap();
|
||||
let s3 = inner.options.s3.as_ref().unwrap();
|
||||
let task =
|
||||
s3.get_object(format!("tasks.{id:0>10}")).unwrap();
|
||||
|
||||
let task = serde_json::from_slice(task.as_slice()).unwrap();
|
||||
inner.register_raw_task(&mut wtxn, &task).unwrap();
|
||||
// we received a new tasks, we must wake up
|
||||
self.wake_up.signal();
|
||||
|
@ -569,11 +569,9 @@ impl IndexScheduler {
|
|||
.rsplit_once('-')
|
||||
.map(|(_, id)| id.parse::<u32>().unwrap())
|
||||
.unwrap();
|
||||
let path = Path::new(env!("HOME"))
|
||||
.join("zk-tasks")
|
||||
.join(format!("{:0>10?}", id));
|
||||
let file = File::open(path).unwrap();
|
||||
let task = serde_json::from_reader(file).unwrap();
|
||||
let s3 = inner.options.s3.as_ref().unwrap();
|
||||
let task = s3.get_object(format!("tasks.{id:0>10}")).unwrap();
|
||||
let task = serde_json::from_slice(task.as_slice()).unwrap();
|
||||
inner.register_raw_task(&mut wtxn, &task).unwrap();
|
||||
wtxn.commit().unwrap();
|
||||
}
|
||||
|
@ -632,49 +630,53 @@ impl IndexScheduler {
|
|||
.map(|(_, id)| id.parse::<u32>().unwrap())
|
||||
.unwrap();
|
||||
|
||||
let zk_snapshots = format!("{}/zk-snapshots", env!("HOME"));
|
||||
std::fs::create_dir_all(&zk_snapshots).unwrap();
|
||||
let snapshot_dir =
|
||||
PathBuf::from(format!("{zk_snapshots}/{snapshot_id:0>10?}"));
|
||||
std::fs::create_dir(&snapshot_dir).unwrap();
|
||||
let zk_snapshots = format!("snapshots");
|
||||
let snapshot_dir = format!("{zk_snapshots}.{snapshot_id:0>10?}");
|
||||
|
||||
let s3 = inner.options.s3.as_ref().unwrap();
|
||||
|
||||
// 1. Snapshot the version file.
|
||||
let dst = snapshot_dir.join(meilisearch_types::VERSION_FILE_NAME);
|
||||
std::fs::copy(&inner.version_file_path, dst).unwrap();
|
||||
let dst = format!(
|
||||
"{snapshot_dir}.{}",
|
||||
meilisearch_types::VERSION_FILE_NAME
|
||||
);
|
||||
let mut version_file_path =
|
||||
File::open(&inner.version_file_path).unwrap();
|
||||
s3.put_object_stream(&mut version_file_path, dst).unwrap();
|
||||
version_file_path.sync_data().unwrap();
|
||||
drop(version_file_path);
|
||||
|
||||
// 2. Snapshot the index-scheduler LMDB env
|
||||
log::info!("Snapshotting the tasks");
|
||||
let env = inner.env.clone();
|
||||
env.copy_to_path(
|
||||
snapshot_dir.join("tasks.mdb"),
|
||||
heed::CompactionOption::Enabled,
|
||||
let snapshot_dir = format!("{zk_snapshots}.{snapshot_id:0>10?}");
|
||||
|
||||
let mut temp = NamedTempFile::new().unwrap();
|
||||
env.copy_to_path(temp.path(), heed::CompactionOption::Enabled)
|
||||
.unwrap();
|
||||
s3.put_object_stream(
|
||||
&mut temp,
|
||||
format!("{snapshot_dir}.tasks.mdb"),
|
||||
)
|
||||
.unwrap();
|
||||
temp.close().unwrap();
|
||||
|
||||
// 3. Snapshot every indexes
|
||||
log::info!("Snapshotting the indexes");
|
||||
let dst = snapshot_dir.join("indexes");
|
||||
std::fs::create_dir_all(&dst).unwrap();
|
||||
let dst = format!("{snapshot_dir}.indexes");
|
||||
|
||||
let indexes = inner
|
||||
.index_mapper
|
||||
.index_mapping
|
||||
.iter(&rtxn)
|
||||
.unwrap()
|
||||
.map(|ret| ret.unwrap())
|
||||
.map(|(name, uuid)| (name.to_string(), uuid))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (name, uuid) in indexes {
|
||||
for ret in inner.index_mapper.index_mapping.iter(&rtxn).unwrap() {
|
||||
let (name, uuid) = ret.unwrap();
|
||||
log::info!(" Snapshotting index {name}");
|
||||
let dst = dst.clone();
|
||||
let index = inner.index_mapper.index(&rtxn, &name).unwrap();
|
||||
let mut temp = NamedTempFile::new().unwrap();
|
||||
index
|
||||
.copy_to_path(
|
||||
dst.join(format!("{uuid}.mdb")),
|
||||
heed::CompactionOption::Enabled,
|
||||
)
|
||||
.copy_to_path(temp.path(), heed::CompactionOption::Enabled)
|
||||
.unwrap();
|
||||
s3.put_object_stream(&mut temp, format!("{dst}.{uuid}.mdb"))
|
||||
.unwrap();
|
||||
temp.close().unwrap();
|
||||
}
|
||||
|
||||
// we must notify everyone that we dropped a new snapshot on the s3
|
||||
|
@ -1476,10 +1478,9 @@ impl IndexSchedulerInner {
|
|||
|
||||
// TODO: send task to ZK in raw json.
|
||||
if let Some(zookeeper) = &self.zookeeper {
|
||||
std::fs::write(
|
||||
Path::new(std::env!("HOME")).join("zk-tasks").join(format!("{:0>10?}", id)),
|
||||
serde_json::to_vec_pretty(&task).unwrap(),
|
||||
)?;
|
||||
let s3 = self.options.s3.as_ref().unwrap();
|
||||
s3.put_object(format!("tasks.{id:0>10?}"), &serde_json::to_vec_pretty(&task).unwrap())
|
||||
.unwrap();
|
||||
|
||||
// TODO: ugly unwrap
|
||||
zookeeper.set_data(&format!("/tasks/task-{:0>10?}", id), b"ok".to_vec(), None).unwrap();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue