mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-05 02:55:46 +01:00
Mastering minio
This commit is contained in:
parent
5b89276fcc
commit
01c13c98ac
@ -35,6 +35,7 @@ pub type TaskId = u32;
|
|||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::ops::{Bound, RangeBounds};
|
use std::ops::{Bound, RangeBounds};
|
||||||
|
use std::os::fd::AsRawFd;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::atomic::Ordering::Relaxed;
|
use std::sync::atomic::Ordering::Relaxed;
|
||||||
@ -58,7 +59,7 @@ use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
|
|||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use s3::Bucket;
|
use s3::Bucket;
|
||||||
use synchronoise::SignalEvent;
|
use synchronoise::SignalEvent;
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::{NamedTempFile, TempDir};
|
||||||
use time::format_description::well_known::Rfc3339;
|
use time::format_description::well_known::Rfc3339;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
|
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
|
||||||
@ -430,15 +431,17 @@ impl IndexScheduler {
|
|||||||
tempfile::TempDir::new_in(&base_path).unwrap();
|
tempfile::TempDir::new_in(&base_path).unwrap();
|
||||||
|
|
||||||
let src = format!("{snapshot_dir}.indexes");
|
let src = format!("{snapshot_dir}.indexes");
|
||||||
for result in std::fs::read_dir(&src).unwrap() {
|
let uuids =
|
||||||
let entry = result.unwrap();
|
s3.list(src.clone(), None).unwrap().into_iter().flat_map(
|
||||||
let uuid = entry
|
|lbr| {
|
||||||
.file_name()
|
lbr.contents.into_iter().map(|o| {
|
||||||
.as_os_str()
|
let mut iter = o.key.rsplit('.');
|
||||||
.to_str()
|
iter.nth(1).unwrap().to_string()
|
||||||
.unwrap()
|
})
|
||||||
.to_string();
|
},
|
||||||
log::info!("\tDownloading the index {}", uuid.to_string());
|
);
|
||||||
|
for uuid in uuids {
|
||||||
|
log::info!("\tDownloading the index {}", uuid);
|
||||||
std::fs::create_dir_all(
|
std::fs::create_dir_all(
|
||||||
indexes_files.path().join(&uuid).with_extension(""),
|
indexes_files.path().join(&uuid).with_extension(""),
|
||||||
)
|
)
|
||||||
@ -642,7 +645,12 @@ impl IndexScheduler {
|
|||||||
);
|
);
|
||||||
let mut version_file_path =
|
let mut version_file_path =
|
||||||
File::open(&inner.version_file_path).unwrap();
|
File::open(&inner.version_file_path).unwrap();
|
||||||
s3.put_object_stream(&mut version_file_path, dst).unwrap();
|
s3.put_object_stream(&mut version_file_path, dst)
|
||||||
|
.or_else(|e| match e {
|
||||||
|
s3::error::S3Error::Http(404, _) => Ok(200),
|
||||||
|
e => Err(e),
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
version_file_path.sync_data().unwrap();
|
version_file_path.sync_data().unwrap();
|
||||||
drop(version_file_path);
|
drop(version_file_path);
|
||||||
|
|
||||||
@ -651,13 +659,21 @@ impl IndexScheduler {
|
|||||||
let env = inner.env.clone();
|
let env = inner.env.clone();
|
||||||
let snapshot_dir = format!("{zk_snapshots}.{snapshot_id:0>10?}");
|
let snapshot_dir = format!("{zk_snapshots}.{snapshot_id:0>10?}");
|
||||||
|
|
||||||
let mut temp = NamedTempFile::new().unwrap();
|
let temp = TempDir::new().unwrap();
|
||||||
env.copy_to_path(temp.path(), heed::CompactionOption::Enabled)
|
let mut file = env
|
||||||
|
.copy_to_path(
|
||||||
|
temp.path().join("data.mdb"),
|
||||||
|
heed::CompactionOption::Enabled,
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
s3.put_object_stream(
|
s3.put_object_stream(
|
||||||
&mut temp,
|
&mut file,
|
||||||
format!("{snapshot_dir}.tasks.mdb"),
|
format!("{snapshot_dir}.tasks.mdb"),
|
||||||
)
|
)
|
||||||
|
.or_else(|e| match e {
|
||||||
|
s3::error::S3Error::Http(404, _) => Ok(200),
|
||||||
|
e => Err(e),
|
||||||
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
temp.close().unwrap();
|
temp.close().unwrap();
|
||||||
|
|
||||||
@ -670,11 +686,18 @@ impl IndexScheduler {
|
|||||||
log::info!(" Snapshotting index {name}");
|
log::info!(" Snapshotting index {name}");
|
||||||
let dst = dst.clone();
|
let dst = dst.clone();
|
||||||
let index = inner.index_mapper.index(&rtxn, &name).unwrap();
|
let index = inner.index_mapper.index(&rtxn, &name).unwrap();
|
||||||
let mut temp = NamedTempFile::new().unwrap();
|
let temp = TempDir::new().unwrap();
|
||||||
index
|
let mut file = index
|
||||||
.copy_to_path(temp.path(), heed::CompactionOption::Enabled)
|
.copy_to_path(
|
||||||
|
temp.path().join("data.mdb"),
|
||||||
|
heed::CompactionOption::Enabled,
|
||||||
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
s3.put_object_stream(&mut temp, format!("{dst}.{uuid}.mdb"))
|
s3.put_object_stream(&mut file, format!("{dst}.{uuid}.mdb"))
|
||||||
|
.or_else(|e| match e {
|
||||||
|
s3::error::S3Error::Http(404, _) => Ok(200),
|
||||||
|
e => Err(e),
|
||||||
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
temp.close().unwrap();
|
temp.close().unwrap();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user