first mostly working version

This commit is contained in:
Tamo 2022-10-16 01:39:01 +02:00 committed by Clément Renault
parent c051166bcc
commit d976e680c5
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
18 changed files with 403 additions and 57 deletions

1
Cargo.lock generated
View File

@ -2284,6 +2284,7 @@ dependencies = [
"cargo_toml", "cargo_toml",
"clap 4.0.9", "clap 4.0.9",
"crossbeam-channel", "crossbeam-channel",
"dump",
"either", "either",
"env_logger", "env_logger",
"file-store", "file-store",

View File

@ -1,8 +1,10 @@
use meilisearch_types::{ use meilisearch_types::{
error::ResponseError, error::ResponseError,
keys::Key,
milli::update::IndexDocumentsMethod, milli::update::IndexDocumentsMethod,
settings::Unchecked, settings::Unchecked,
tasks::{Details, KindWithContent, Status, Task, TaskId}, tasks::{Details, KindWithContent, Status, Task, TaskId},
InstanceUid,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
@ -12,7 +14,7 @@ mod reader;
mod writer; mod writer;
pub use error::Error; pub use error::Error;
pub use reader::DumpReader; pub use reader::{DumpReader, UpdateFile};
pub use writer::DumpWriter; pub use writer::DumpWriter;
const CURRENT_DUMP_VERSION: Version = Version::V6; const CURRENT_DUMP_VERSION: Version = Version::V6;
@ -49,14 +51,13 @@ pub enum Version {
V6, V6,
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct TaskDump { pub struct TaskDump {
pub uid: TaskId, pub uid: TaskId,
#[serde(default)] #[serde(default)]
pub index_uid: Option<String>, pub index_uid: Option<String>,
pub status: Status, pub status: Status,
// TODO use our own Kind for the user
#[serde(rename = "type")] #[serde(rename = "type")]
pub kind: KindDump, pub kind: KindDump,
@ -82,7 +83,7 @@ pub struct TaskDump {
} }
// A `Kind` specific version made for the dump. If modified you may break the dump. // A `Kind` specific version made for the dump. If modified you may break the dump.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub enum KindDump { pub enum KindDump {
DocumentImport { DocumentImport {
@ -118,7 +119,9 @@ pub enum KindDump {
query: String, query: String,
tasks: Vec<TaskId>, tasks: Vec<TaskId>,
}, },
DumpExport, DumpExport {
dump_uid: String,
},
Snapshot, Snapshot,
} }
@ -177,7 +180,7 @@ impl From<KindWithContent> for KindDump {
KindWithContent::IndexSwap { lhs, rhs } => KindDump::IndexSwap { lhs, rhs }, KindWithContent::IndexSwap { lhs, rhs } => KindDump::IndexSwap { lhs, rhs },
KindWithContent::CancelTask { tasks } => KindDump::CancelTask { tasks }, KindWithContent::CancelTask { tasks } => KindDump::CancelTask { tasks },
KindWithContent::DeleteTasks { query, tasks } => KindDump::DeleteTasks { query, tasks }, KindWithContent::DeleteTasks { query, tasks } => KindDump::DeleteTasks { query, tasks },
KindWithContent::DumpExport { .. } => KindDump::DumpExport, KindWithContent::DumpExport { dump_uid, .. } => KindDump::DumpExport { dump_uid },
KindWithContent::Snapshot => KindDump::Snapshot, KindWithContent::Snapshot => KindDump::Snapshot,
} }
} }
@ -206,8 +209,7 @@ pub(crate) mod test {
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
reader::{self, Document}, reader::Document, DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version,
DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version,
}; };
pub fn create_test_instance_uid() -> Uuid { pub fn create_test_instance_uid() -> Uuid {

View File

@ -116,7 +116,9 @@ impl CompatV5ToV6 {
allow_index_creation, allow_index_creation,
settings: settings.into(), settings: settings.into(),
}, },
v5::tasks::TaskContent::Dump { .. } => v6::Kind::DumpExport, v5::tasks::TaskContent::Dump { uid } => {
v6::Kind::DumpExport { dump_uid: uid }
}
}, },
details: task_view.details.map(|details| match details { details: task_view.details.map(|details| match details {
v5::Details::DocumentAddition { v5::Details::DocumentAddition {
@ -412,7 +414,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"0fff3c32487e3d3058d51ed951c1057f"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"84d5b8eb31735d643483fcee28080edf");
assert_eq!(update_files.len(), 22); assert_eq!(update_files.len(), 22);
assert!(update_files[0].is_none()); // the dump creation assert!(update_files[0].is_none()); // the dump creation
assert!(update_files[1].is_some()); // the enqueued document addition assert!(update_files[1].is_some()); // the enqueued document addition

View File

@ -203,7 +203,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"0fff3c32487e3d3058d51ed951c1057f"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"84d5b8eb31735d643483fcee28080edf");
assert_eq!(update_files.len(), 22); assert_eq!(update_files.len(), 22);
assert!(update_files[0].is_none()); // the dump creation assert!(update_files[0].is_none()); // the dump creation
assert!(update_files[1].is_some()); // the enqueued document addition assert!(update_files[1].is_some()); // the enqueued document addition

View File

@ -109,7 +109,7 @@ impl V6Reader {
&mut self, &mut self,
) -> Box<dyn Iterator<Item = Result<(Task, Option<Box<super::UpdateFile>>)>> + '_> { ) -> Box<dyn Iterator<Item = Result<(Task, Option<Box<super::UpdateFile>>)>> + '_> {
Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> {
let task: Task = serde_json::from_str(&line?)?; let task: Task = serde_json::from_str(&dbg!(line?)).unwrap();
let update_file_path = self let update_file_path = self
.dump .dump
@ -121,7 +121,8 @@ impl V6Reader {
if update_file_path.exists() { if update_file_path.exists() {
Ok(( Ok((
task, task,
Some(Box::new(UpdateFile::new(&update_file_path)?) as Box<super::UpdateFile>), Some(Box::new(UpdateFile::new(&update_file_path).unwrap())
as Box<super::UpdateFile>),
)) ))
} else { } else {
Ok((task, None)) Ok((task, None))

View File

@ -71,24 +71,26 @@ impl DumpWriter {
} }
pub struct KeyWriter { pub struct KeyWriter {
file: File, keys: BufWriter<File>,
} }
impl KeyWriter { impl KeyWriter {
pub(crate) fn new(path: PathBuf) -> Result<Self> { pub(crate) fn new(path: PathBuf) -> Result<Self> {
let file = File::create(path.join("keys.jsonl"))?; let keys = File::create(path.join("keys.jsonl"))?;
Ok(KeyWriter { file }) Ok(KeyWriter {
keys: BufWriter::new(keys),
})
} }
pub fn push_key(&mut self, key: &Key) -> Result<()> { pub fn push_key(&mut self, key: &Key) -> Result<()> {
self.file.write_all(&serde_json::to_vec(key)?)?; self.keys.write_all(&serde_json::to_vec(key)?)?;
self.file.write_all(b"\n")?; self.keys.write_all(b"\n")?;
Ok(()) Ok(())
} }
} }
pub struct TaskWriter { pub struct TaskWriter {
queue: File, queue: BufWriter<File>,
update_files: PathBuf, update_files: PathBuf,
} }
@ -101,7 +103,7 @@ impl TaskWriter {
std::fs::create_dir(&update_files)?; std::fs::create_dir(&update_files)?;
Ok(TaskWriter { Ok(TaskWriter {
queue, queue: BufWriter::new(queue),
update_files, update_files,
}) })
} }
@ -111,6 +113,7 @@ impl TaskWriter {
pub fn push_task(&mut self, task: &TaskDump) -> Result<UpdateFile> { pub fn push_task(&mut self, task: &TaskDump) -> Result<UpdateFile> {
self.queue.write_all(&serde_json::to_vec(task)?)?; self.queue.write_all(&serde_json::to_vec(task)?)?;
self.queue.write_all(b"\n")?; self.queue.write_all(b"\n")?;
self.queue.flush()?;
Ok(UpdateFile::new( Ok(UpdateFile::new(
self.update_files.join(format!("{}.jsonl", task.uid)), self.update_files.join(format!("{}.jsonl", task.uid)),

View File

@ -736,7 +736,7 @@ impl IndexScheduler {
let user_result = match user_result { let user_result = match user_result {
Ok(count) => Ok(DocumentAdditionResult { Ok(count) => Ok(DocumentAdditionResult {
indexed_documents: count, indexed_documents: count,
number_of_documents: count, number_of_documents: count, // TODO: this is wrong, we should use the value stored in the Details.
}), }),
Err(e) => Err(milli::Error::from(e)), Err(e) => Err(milli::Error::from(e)),
}; };

View File

@ -13,6 +13,8 @@ pub enum Error {
IndexAlreadyExists(String), IndexAlreadyExists(String),
#[error("Corrupted task queue.")] #[error("Corrupted task queue.")]
CorruptedTaskQueue, CorruptedTaskQueue,
#[error("Corrupted dump.")]
CorruptedDump,
#[error("Task `{0}` not found")] #[error("Task `{0}` not found")]
TaskNotFound(TaskId), TaskNotFound(TaskId),
// TODO: Lo: proper error message for this // TODO: Lo: proper error message for this
@ -49,14 +51,15 @@ impl ErrorCode for Error {
Error::InvalidStatus(_) => Code::BadRequest, Error::InvalidStatus(_) => Code::BadRequest,
Error::InvalidKind(_) => Code::BadRequest, Error::InvalidKind(_) => Code::BadRequest,
// TODO: TAMO: are all these errors really internal?
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(), Error::Milli(e) => e.error_code(),
// TODO: TAMO: are all these errors really internal?
Error::Heed(_) => Code::Internal, Error::Heed(_) => Code::Internal,
Error::FileStore(_) => Code::Internal, Error::FileStore(_) => Code::Internal,
Error::IoError(_) => Code::Internal, Error::IoError(_) => Code::Internal,
Error::Anyhow(_) => Code::Internal, Error::Anyhow(_) => Code::Internal,
Error::CorruptedTaskQueue => Code::Internal, Error::CorruptedTaskQueue => Code::Internal,
Error::CorruptedDump => Code::Internal,
} }
} }
} }

View File

@ -28,7 +28,7 @@ pub struct IndexMapper {
base_path: PathBuf, base_path: PathBuf,
index_size: usize, index_size: usize,
indexer_config: Arc<IndexerConfig>, pub indexer_config: Arc<IndexerConfig>,
} }
/// Weither the index must not be inserted back /// Weither the index must not be inserted back

View File

@ -9,13 +9,17 @@ mod utils;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32; pub type TaskId = u32;
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error; pub use error::Error;
use meilisearch_types::keys::Key;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use meilisearch_types::InstanceUid;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use file_store::{File, FileStore}; use file_store::FileStore;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -220,10 +224,6 @@ impl IndexScheduler {
Ok(this) Ok(this)
} }
pub fn import_dump(&self, dump_path: PathBuf) -> Result<()> {
todo!()
}
/// This function will execute in a different thread and must be called only once. /// This function will execute in a different thread and must be called only once.
fn run(&self) { fn run(&self) {
let run = Self { let run = Self {
@ -254,6 +254,10 @@ impl IndexScheduler {
}); });
} }
pub fn indexer_config(&self) -> &IndexerConfig {
&self.index_mapper.indexer_config
}
/// Return the index corresponding to the name. If it wasn't opened before /// Return the index corresponding to the name. If it wasn't opened before
/// it'll be opened. But if it doesn't exist on disk it'll throw an /// it'll be opened. But if it doesn't exist on disk it'll throw an
/// `IndexNotFound` error. /// `IndexNotFound` error.
@ -390,11 +394,138 @@ impl IndexScheduler {
Ok(task) Ok(task)
} }
pub fn create_update_file(&self) -> Result<(Uuid, File)> { /// Register a new task comming from a dump in the scheduler.
/// By takinig a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumpped_task(
&mut self,
task: TaskDump,
content_file: Option<Box<UpdateFile>>,
keys: &[Key],
instance_uid: Option<InstanceUid>,
) -> Result<Task> {
// Currently we don't need to access the tasks queue while loading a dump thus I can block everything.
let mut wtxn = self.env.write_txn()?;
let content_uuid = if let Some(content_file) = content_file {
let (uuid, mut file) = self.create_update_file()?;
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
for doc in content_file {
builder.append_json_object(&doc?)?;
}
builder.into_inner()?;
file.persist()?;
Some(uuid)
} else {
None
};
let task = Task {
uid: task.uid,
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
error: task.error,
details: task.details,
status: task.status,
kind: match task.kind {
KindDump::DocumentImport {
primary_key,
method,
documents_count,
allow_index_creation,
} => KindWithContent::DocumentImport {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
method,
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
documents_count,
allow_index_creation,
},
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
documents_ids,
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::DocumentClear => KindWithContent::DocumentClear {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::Settings {
settings,
is_deletion,
allow_index_creation,
} => KindWithContent::Settings {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
new_settings: settings,
is_deletion,
allow_index_creation,
},
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs },
KindDump::CancelTask { tasks } => KindWithContent::CancelTask { tasks },
KindDump::DeleteTasks { query, tasks } => {
KindWithContent::DeleteTasks { query, tasks }
}
KindDump::DumpExport { dump_uid } => KindWithContent::DumpExport {
dump_uid,
keys: keys.to_vec(),
instance_uid,
},
KindDump::Snapshot => KindWithContent::Snapshot,
},
};
self.all_tasks
.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
if let Some(indexes) = task.indexes() {
for index in indexes {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
}
self.update_status(&mut wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})?;
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})?;
match wtxn.commit() {
Ok(()) => (),
_e @ Err(_) => {
todo!("remove the data associated with the task");
// _e?;
}
}
Ok(task)
}
/// Create a new index without any associated task.
pub fn create_raw_index(&self, name: &str) -> Result<Index> {
let mut wtxn = self.env.write_txn()?;
self.index_mapper.create_index(&mut wtxn, name)
}
pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> {
Ok(self.file_store.new_update()?) Ok(self.file_store.new_update()?)
} }
#[cfg(test)] #[cfg(test)]
pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, File)> { pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> {
Ok(self.file_store.new_update_with_uuid(uuid)?) Ok(self.file_store.new_update_with_uuid(uuid)?)
} }

View File

@ -165,6 +165,17 @@ impl AuthController {
None => Ok(false), None => Ok(false),
} }
} }
/// Delete all the keys in the DB.
pub fn raw_delete_all_keys(&mut self) -> Result<()> {
self.store.delete_all_keys()
}
/// Delete all the keys in the DB.
pub fn raw_insert_key(&mut self, key: Key) -> Result<()> {
self.store.put_api_key(key)?;
Ok(())
}
} }
pub struct AuthFilter { pub struct AuthFilter {

View File

@ -197,6 +197,13 @@ impl HeedAuthStore {
Ok(existing) Ok(existing)
} }
pub fn delete_all_keys(&self) -> Result<()> {
let mut wtxn = self.env.write_txn()?;
self.keys.clear(&mut wtxn)?;
wtxn.commit()?;
Ok(())
}
pub fn list_api_keys(&self) -> Result<Vec<Key>> { pub fn list_api_keys(&self) -> Result<Vec<Key>> {
let mut list = Vec::new(); let mut list = Vec::new();
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;

View File

@ -34,6 +34,7 @@ byte-unit = { version = "4.0.14", default-features = false, features = ["std", "
bytes = "1.2.1" bytes = "1.2.1"
clap = { version = "4.0.9", features = ["derive", "env"] } clap = { version = "4.0.9", features = ["derive", "env"] }
crossbeam-channel = "0.5.6" crossbeam-channel = "0.5.6"
dump = { path = "../dump" }
either = "1.8.0" either = "1.8.0"
env_logger = "0.9.1" env_logger = "0.9.1"
flate2 = "1.0.24" flate2 = "1.0.24"

View File

@ -13,14 +13,28 @@ pub mod metrics;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
pub mod route_metrics; pub mod route_metrics;
use std::sync::{atomic::AtomicBool, Arc}; use std::{
fs::File,
io::{BufReader, BufWriter, Seek, SeekFrom},
path::Path,
sync::{atomic::AtomicBool, Arc},
};
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
use actix_web::error::JsonPayloadError; use actix_web::error::JsonPayloadError;
use actix_web::web::Data; use actix_web::web::Data;
use analytics::Analytics; use analytics::Analytics;
use anyhow::bail;
use error::PayloadError; use error::PayloadError;
use http::header::CONTENT_TYPE; use http::header::CONTENT_TYPE;
use meilisearch_types::{
milli::{
self,
documents::{DocumentsBatchBuilder, DocumentsBatchReader},
update::{IndexDocumentsConfig, IndexDocumentsMethod},
},
settings::apply_settings_to_builder,
};
pub use option::Opt; pub use option::Opt;
use actix_web::{web, HttpRequest}; use actix_web::{web, HttpRequest};
@ -31,19 +45,83 @@ use meilisearch_auth::AuthController;
pub static AUTOBATCHING_ENABLED: AtomicBool = AtomicBool::new(false); pub static AUTOBATCHING_ENABLED: AtomicBool = AtomicBool::new(false);
/// Check if a db is empty. It does not provide any information on the
/// validity of the data in it.
/// We consider a database as non empty when it's a non empty directory.
fn is_empty_db(db_path: impl AsRef<Path>) -> bool {
let db_path = db_path.as_ref();
if !db_path.exists() {
true
// if we encounter an error or if the db is a file we consider the db non empty
} else if let Ok(dir) = db_path.read_dir() {
dir.count() == 0
} else {
true
}
}
// TODO: TAMO: Finish setting up things // TODO: TAMO: Finish setting up things
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<IndexScheduler> { pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(IndexScheduler, AuthController)> {
let meilisearch = IndexScheduler::new( // we don't want to create anything in the data.ms yet, thus we
opt.db_path.join("tasks"), // wrap our two builders in a closure that'll be executed later.
opt.db_path.join("update_files"), let auth_controller_builder = || AuthController::new(&opt.db_path, &opt.master_key);
opt.db_path.join("indexes"),
opt.dumps_dir.clone(), let index_scheduler_builder = || {
opt.max_index_size.get_bytes() as usize, IndexScheduler::new(
(&opt.indexer_options).try_into()?, opt.db_path.join("tasks"),
true, opt.db_path.join("update_files"),
#[cfg(test)] opt.db_path.join("indexes"),
todo!("We'll see later"), opt.dumps_dir.clone(),
)?; opt.max_index_size.get_bytes() as usize,
(&opt.indexer_options).try_into()?,
true,
#[cfg(test)]
todo!("We'll see later"),
)
};
let (index_scheduler, auth_controller) = if let Some(ref _path) = opt.import_snapshot {
// handle the snapshot with something akin to the dumps
// + the snapshot interval / spawning a thread
todo!();
} else if let Some(ref path) = opt.import_dump {
let empty_db = is_empty_db(&opt.db_path);
let src_path_exists = path.exists();
if empty_db && src_path_exists {
let mut index_scheduler = index_scheduler_builder()?;
let mut auth_controller = auth_controller_builder()?;
import_dump(
&opt.db_path,
path,
&mut index_scheduler,
&mut auth_controller,
)?;
(index_scheduler, auth_controller)
} else if !empty_db && !opt.ignore_dump_if_db_exists {
bail!(
"database already exists at {:?}, try to delete it or rename it",
opt.db_path
.canonicalize()
.unwrap_or_else(|_| opt.db_path.to_owned())
)
} else if !src_path_exists && !opt.ignore_missing_dump {
bail!("dump doesn't exist at {:?}", path)
} else {
let mut index_scheduler = index_scheduler_builder()?;
let mut auth_controller = auth_controller_builder()?;
import_dump(
&opt.db_path,
path,
&mut index_scheduler,
&mut auth_controller,
)?;
(index_scheduler, auth_controller)
}
} else {
(index_scheduler_builder()?, auth_controller_builder()?)
};
/* /*
TODO: We should start a thread to handle the snapshots. TODO: We should start a thread to handle the snapshots.
@ -53,25 +131,125 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<IndexScheduler> {
.set_ignore_snapshot_if_db_exists(opt.ignore_snapshot_if_db_exists) .set_ignore_snapshot_if_db_exists(opt.ignore_snapshot_if_db_exists)
.set_snapshot_interval(Duration::from_secs(opt.snapshot_interval_sec)) .set_snapshot_interval(Duration::from_secs(opt.snapshot_interval_sec))
.set_snapshot_dir(opt.snapshot_dir.clone()) .set_snapshot_dir(opt.snapshot_dir.clone())
// dump
.set_ignore_missing_dump(opt.ignore_missing_dump)
.set_ignore_dump_if_db_exists(opt.ignore_dump_if_db_exists)
.set_dump_dst(opt.dumps_dir.clone());
if let Some(ref path) = opt.import_snapshot { if let Some(ref path) = opt.import_snapshot {
meilisearch.set_import_snapshot(path.clone()); meilisearch.set_import_snapshot(path.clone());
} }
if let Some(ref path) = opt.import_dump {
meilisearch.set_dump_src(path.clone());
}
if opt.schedule_snapshot { if opt.schedule_snapshot {
meilisearch.set_schedule_snapshot(); meilisearch.set_schedule_snapshot();
} }
*/ */
Ok(meilisearch) Ok((index_scheduler, auth_controller))
}
fn import_dump(
db_path: &Path,
dump_path: &Path,
index_scheduler: &mut IndexScheduler,
auth: &mut AuthController,
) -> Result<(), anyhow::Error> {
let reader = File::open(dump_path)?;
let mut dump_reader = dump::DumpReader::open(reader)?;
if let Some(date) = dump_reader.date() {
log::info!(
"Importing a dump of meilisearch `{:?}` from the {}",
dump_reader.version(), // TODO: get the meilisearch version instead of the dump version
date
);
} else {
log::info!(
"Importing a dump of meilisearch `{:?}`",
dump_reader.version(), // TODO: get the meilisearch version instead of the dump version
);
}
let instance_uid = dump_reader.instance_uid()?;
// 1. Import the instance-uid.
if let Some(ref instance_uid) = instance_uid {
// we don't want to panic if there is an error with the instance-uid.
let _ = std::fs::write(
db_path.join("instance-uid"),
instance_uid.to_string().as_bytes(),
);
};
// 2. Import the `Key`s.
let mut keys = Vec::new();
auth.raw_delete_all_keys()?;
for key in dump_reader.keys() {
let key = key?;
auth.raw_insert_key(key.clone())?;
keys.push(key);
}
// 3. Import the tasks.
for ret in dump_reader.tasks() {
let (task, file) = ret?;
index_scheduler.register_dumpped_task(task, file, &keys, instance_uid)?;
}
let indexer_config = index_scheduler.indexer_config();
// 4. Import the indexes.
for index_reader in dump_reader.indexes()? {
let mut index_reader = index_reader?;
let metadata = index_reader.metadata();
log::info!("Importing index `{}`.", metadata.uid);
let index = index_scheduler.create_raw_index(&metadata.uid)?;
let mut wtxn = index.write_txn()?;
let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config);
// 4.1 Import the primary key if there is one.
if let Some(ref primary_key) = metadata.primary_key {
builder.set_primary_key(primary_key.to_string());
}
// 4.2 Import the settings.
log::info!("Importing the settings.");
let settings = index_reader.settings()?;
apply_settings_to_builder(&settings, &mut builder);
builder.execute(|indexing_step| {
log::debug!("update: {:?}", indexing_step);
})?;
// 4.3 Import the documents.
// 4.3.1 We need to recreate the grenad+obkv format accepted by the index.
log::info!("Importing the documents.");
let mut file = tempfile::tempfile()?;
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(&mut file));
for document in index_reader.documents()? {
builder.append_json_object(&document?)?;
}
builder.into_inner()?; // this actually flush the content of the batch builder.
// 4.3.2 We feed it to the milli index.
file.seek(SeekFrom::Start(0))?;
let reader = BufReader::new(file);
let reader = DocumentsBatchReader::from_reader(reader)?;
let builder = milli::update::IndexDocuments::new(
&mut wtxn,
&index,
indexer_config,
IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
},
|indexing_step| log::debug!("update: {:?}", indexing_step),
)?;
let (builder, user_result) = builder.add_documents(reader)?;
log::info!("{} documents found.", user_result?);
builder.execute()?;
wtxn.commit()?;
log::info!("All documents successfully imported.");
}
Ok(())
} }
pub fn configure_data( pub fn configure_data(

View File

@ -48,9 +48,13 @@ async fn main() -> anyhow::Result<()> {
_ => unreachable!(), _ => unreachable!(),
} }
let index_scheduler = setup_meilisearch(&opt)?; let (index_scheduler, auth_controller) = match setup_meilisearch(&opt) {
Ok(ret) => ret,
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key)?; Err(e) => {
std::fs::remove_dir_all(opt.db_path)?;
return Err(e);
}
};
#[cfg(all(not(debug_assertions), feature = "analytics"))] #[cfg(all(not(debug_assertions), feature = "analytics"))]
let analytics = if !opt.no_analytics { let analytics = if !opt.no_analytics {

View File

@ -242,7 +242,9 @@ async fn document_addition(
let (uuid, mut update_file) = index_scheduler.create_update_file()?; let (uuid, mut update_file) = index_scheduler.create_update_file()?;
// TODO: this can be slow, maybe we should spawn a thread? But the payload isn't Send+Sync :weary:
// push the entire stream into a `Vec`. // push the entire stream into a `Vec`.
// If someone sends us a never ending stream we're going to block the thread.
// TODO: Maybe we should write it to a file to reduce the RAM consumption // TODO: Maybe we should write it to a file to reduce the RAM consumption
// and then reread it to convert it to obkv? // and then reread it to convert it to obkv?
let mut buffer = Vec::new(); let mut buffer = Vec::new();

View File

@ -14,7 +14,7 @@ type Result<T> = std::result::Result<T, Error>;
pub type KeyId = Uuid; pub type KeyId = Uuid;
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct Key { pub struct Key {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>, pub description: Option<String>,

View File

@ -7,7 +7,7 @@ use std::str::FromStr;
/// A type that tries to match either a star (*) or /// A type that tries to match either a star (*) or
/// any other thing that implements `FromStr`. /// any other thing that implements `FromStr`.
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum StarOr<T> { pub enum StarOr<T> {
Star, Star,
Other(T), Other(T),