diff --git a/Cargo.lock b/Cargo.lock index bee967723..017257512 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -359,9 +359,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.72" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854" +checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" dependencies = [ "backtrace", ] @@ -1114,10 +1114,11 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.7" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929" +checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" dependencies = [ + "powerfmt", "serde", ] @@ -1276,7 +1277,7 @@ dependencies = [ "tempfile", "thiserror", "time", - "uuid 1.4.1", + "uuid 1.5.0", ] [[package]] @@ -1478,7 +1479,7 @@ dependencies = [ "faux", "tempfile", "thiserror", - "uuid 1.4.1", + "uuid 1.5.0", ] [[package]] @@ -2465,7 +2466,7 @@ dependencies = [ "tempfile", "thiserror", "time", - "uuid 1.4.1", + "uuid 1.5.0", ] [[package]] @@ -3153,7 +3154,7 @@ dependencies = [ "tokio-stream", "toml", "urlencoding", - "uuid 1.4.1", + "uuid 1.5.0", "vergen", "walkdir", "yaup", @@ -3176,7 +3177,7 @@ dependencies = [ "sha2", "thiserror", "time", - "uuid 1.4.1", + "uuid 1.5.0", ] [[package]] @@ -3206,7 +3207,21 @@ dependencies = [ "thiserror", "time", "tokio", - "uuid 1.4.1", + "uuid 1.5.0", +] + +[[package]] +name = "meilitool" +version = "1.5.0" +dependencies = [ + "anyhow", + "clap", + "dump", + "file-store", + "meilisearch-auth", + "meilisearch-types", + "time", + "uuid 1.5.0", ] [[package]] @@ -3286,7 +3301,7 @@ dependencies = [ "tempfile", "thiserror", "time", - "uuid 1.4.1", + "uuid 1.5.0", ] [[package]] @@ -3717,6 +3732,12 @@ dependencies = [ "serde", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -4185,9 +4206,9 @@ checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" [[package]] name = "serde" -version = "1.0.183" +version = "1.0.190" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" +checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7" dependencies = [ "serde_derive", ] @@ -4212,9 +4233,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.183" +version = "1.0.190" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" +checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" dependencies = [ "proc-macro2", "quote", @@ -4559,12 +4580,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.25" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ "deranged", "itoa", + "powerfmt", "serde", "time-core", "time-macros", @@ -4572,15 +4594,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.11" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb71511c991639bb078fd5bf97757e03914361c48100d52878b8e52b46fb92cd" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] @@ -4901,9 +4923,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" dependencies = [ "getrandom", "serde", diff --git a/Cargo.toml b/Cargo.toml index cc16d50db..7b8fab8e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "meilisearch", + "meilitool", "meilisearch-types", "meilisearch-auth", "meili-snap", diff --git a/Dockerfile b/Dockerfile index 70950f338..bf98cbeca 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ FROM rust:alpine3.16 AS compiler RUN apk add -q --update-cache --no-cache build-base openssl-dev -WORKDIR /meilisearch +WORKDIR / ARG COMMIT_SHA ARG COMMIT_DATE @@ -17,7 +17,7 @@ RUN set -eux; \ if [ "$apkArch" = "aarch64" ]; then \ export JEMALLOC_SYS_WITH_LG_PAGE=16; \ fi && \ - cargo build --release + cargo build --release -p meilisearch -p meilitool # Run FROM alpine:3.16 @@ -28,9 +28,10 @@ ENV MEILI_SERVER_PROVIDER docker RUN apk update --quiet \ && apk add -q --no-cache libgcc tini curl -# add meilisearch to the `/bin` so you can run it from anywhere and it's easy -# to find. -COPY --from=compiler /meilisearch/target/release/meilisearch /bin/meilisearch +# add meilisearch and meilitool to the `/bin` so you can run it from anywhere +# and it's easy to find. +COPY --from=compiler /target/release/meilisearch /bin/meilisearch +COPY --from=compiler /target/release/meilitool /bin/meilitool # To stay compatible with the older version of the container (pre v0.27.0) we're # going to symlink the meilisearch binary in the path to `/meilisearch` RUN ln -s /bin/meilisearch /meilisearch diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 43ac2355c..3c61880bb 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -27,7 +27,7 @@ mod index_mapper; mod insta_snapshot; mod lru; mod utils; -mod uuid_codec; +pub mod uuid_codec; pub type Result = std::result::Result; pub type TaskId = u32; diff --git a/meilitool/Cargo.toml b/meilitool/Cargo.toml new file mode 100644 index 000000000..58acd87db --- /dev/null +++ b/meilitool/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "meilitool" +description = "A CLI to edit a Meilisearch database from the command line" +version.workspace = true +authors.workspace = true +homepage.workspace = true +readme.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +anyhow = "1.0.75" +clap = { version = "4.2.1", features = ["derive"] } +dump = { path = "../dump" } +file-store = { path = "../file-store" } +meilisearch-auth = { path = "../meilisearch-auth" } +meilisearch-types = { path = "../meilisearch-types" } +time = { version = "0.3.30", features = ["formatting"] } +uuid = { version = "1.5.0", features = ["v4"], default-features = false } diff --git a/meilitool/src/main.rs b/meilitool/src/main.rs new file mode 100644 index 000000000..2b40e42c2 --- /dev/null +++ b/meilitool/src/main.rs @@ -0,0 +1,312 @@ +use std::fs::{read_dir, read_to_string, remove_file, File}; +use std::io::BufWriter; +use std::path::PathBuf; + +use anyhow::Context; +use clap::{Parser, Subcommand}; +use dump::{DumpWriter, IndexMetadata}; +use file_store::FileStore; +use meilisearch_auth::AuthController; +use meilisearch_types::heed::types::{OwnedType, SerdeJson, Str}; +use meilisearch_types::heed::{Database, Env, EnvOpenOptions, PolyDatabase, RoTxn, RwTxn}; +use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; +use meilisearch_types::milli::{obkv_to_json, BEU32}; +use meilisearch_types::tasks::{Status, Task}; +use meilisearch_types::versioning::check_version_file; +use meilisearch_types::Index; +use time::macros::format_description; +use time::OffsetDateTime; +use uuid_codec::UuidCodec; + +mod uuid_codec; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// The database path where the Meilisearch is running. + #[arg(long, default_value = "data.ms/")] + db_path: PathBuf, + + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// Clears the task queue and make it empty. + /// + /// This command can be safely executed even if Meilisearch is running and processing tasks. + /// Once the task queue is empty you can restart Meilisearch and no more tasks must be visible, + /// even the ones that were processing. However, it's highly possible that you see the processing + /// tasks in the queue again with an associated internal error message. + ClearTaskQueue, + + /// Exports a dump from the Meilisearch database. + /// + /// Make sure to run this command when Meilisearch is not running or running but not processing tasks. + /// If tasks are being processed while a dump is being exported there are chances for the dump to be + /// malformed with missing tasks. + /// + /// TODO Verify this claim or make sure it cannot happen and we can export dumps + /// without caring about killing Meilisearch first! + ExportADump { + /// The directory in which the dump will be created. + #[arg(long, default_value = "dumps/")] + dump_dir: PathBuf, + + /// Skip dumping the enqueued or processing tasks. + /// + /// Can be useful when there are a lot of them and it is not particularly useful + /// to keep them. Note that only the enqueued tasks takes up space so skipping + /// the processed ones is not particularly interesting. + #[arg(long)] + skip_enqueued_tasks: bool, + }, +} + +fn main() -> anyhow::Result<()> { + let Cli { db_path, command } = Cli::parse(); + + check_version_file(&db_path).context("While checking the version file")?; + + match command { + Command::ClearTaskQueue => clear_task_queue(db_path), + Command::ExportADump { dump_dir, skip_enqueued_tasks } => { + export_a_dump(db_path, dump_dir, skip_enqueued_tasks) + } + } +} + +/// Clears the task queue located at `db_path`. +fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> { + let path = db_path.join("tasks"); + let env = EnvOpenOptions::new() + .max_dbs(100) + .open(&path) + .with_context(|| format!("While trying to open {:?}", path.display()))?; + + eprintln!("Deleting tasks from the database..."); + + let mut wtxn = env.write_txn()?; + let all_tasks = try_opening_poly_database(&env, &wtxn, "all-tasks")?; + let total = all_tasks.len(&wtxn)?; + let status = try_opening_poly_database(&env, &wtxn, "status")?; + let kind = try_opening_poly_database(&env, &wtxn, "kind")?; + let index_tasks = try_opening_poly_database(&env, &wtxn, "index-tasks")?; + let canceled_by = try_opening_poly_database(&env, &wtxn, "canceled_by")?; + let enqueued_at = try_opening_poly_database(&env, &wtxn, "enqueued-at")?; + let started_at = try_opening_poly_database(&env, &wtxn, "started-at")?; + let finished_at = try_opening_poly_database(&env, &wtxn, "finished-at")?; + + try_clearing_poly_database(&mut wtxn, all_tasks, "all-tasks")?; + try_clearing_poly_database(&mut wtxn, status, "status")?; + try_clearing_poly_database(&mut wtxn, kind, "kind")?; + try_clearing_poly_database(&mut wtxn, index_tasks, "index-tasks")?; + try_clearing_poly_database(&mut wtxn, canceled_by, "canceled_by")?; + try_clearing_poly_database(&mut wtxn, enqueued_at, "enqueued-at")?; + try_clearing_poly_database(&mut wtxn, started_at, "started-at")?; + try_clearing_poly_database(&mut wtxn, finished_at, "finished-at")?; + + wtxn.commit().context("While committing the transaction")?; + + eprintln!("Successfully deleted {total} tasks from the tasks database!"); + eprintln!("Deleting the content files from disk..."); + + let mut count = 0usize; + let update_files = db_path.join("update_files"); + let entries = read_dir(&update_files).with_context(|| { + format!("While trying to read the content of {:?}", update_files.display()) + })?; + for result in entries { + match result { + Ok(ent) => match remove_file(ent.path()) { + Ok(_) => count += 1, + Err(e) => eprintln!("Error while deleting {:?}: {}", ent.path().display(), e), + }, + Err(e) => { + eprintln!("Error while reading a file in {:?}: {}", update_files.display(), e) + } + } + } + + eprintln!("Sucessfully deleted {count} content files from disk!"); + + Ok(()) +} + +fn try_opening_database( + env: &Env, + rtxn: &RoTxn, + db_name: &str, +) -> anyhow::Result> { + env.open_database(rtxn, Some(db_name)) + .with_context(|| format!("While opening the {db_name:?} database"))? + .with_context(|| format!("Missing the {db_name:?} database")) +} + +fn try_opening_poly_database( + env: &Env, + rtxn: &RoTxn, + db_name: &str, +) -> anyhow::Result { + env.open_poly_database(rtxn, Some(db_name)) + .with_context(|| format!("While opening the {db_name:?} poly database"))? + .with_context(|| format!("Missing the {db_name:?} poly database")) +} + +fn try_clearing_poly_database( + wtxn: &mut RwTxn, + database: PolyDatabase, + db_name: &str, +) -> anyhow::Result<()> { + database.clear(wtxn).with_context(|| format!("While clearing the {db_name:?} database")) +} + +/// Exports a dump into the dump directory. +fn export_a_dump( + db_path: PathBuf, + dump_dir: PathBuf, + skip_enqueued_tasks: bool, +) -> Result<(), anyhow::Error> { + let started_at = OffsetDateTime::now_utc(); + + // 1. Extracts the instance UID from disk + let instance_uid_path = db_path.join("instance-uid"); + let instance_uid = match read_to_string(&instance_uid_path) { + Ok(content) => match content.trim().parse() { + Ok(uuid) => Some(uuid), + Err(e) => { + eprintln!("Impossible to parse instance-uid: {e}"); + None + } + }, + Err(e) => { + eprintln!("Impossible to read {}: {}", instance_uid_path.display(), e); + None + } + }; + + let dump = DumpWriter::new(instance_uid).context("While creating a new dump")?; + let file_store = + FileStore::new(db_path.join("update_files")).context("While opening the FileStore")?; + + let index_scheduler_path = db_path.join("tasks"); + let env = EnvOpenOptions::new() + .max_dbs(100) + .open(&index_scheduler_path) + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + + eprintln!("Dumping the keys..."); + + // 2. dump the keys + let auth_store = AuthController::new(&db_path, &None) + .with_context(|| format!("While opening the auth store at {}", db_path.display()))?; + let mut dump_keys = dump.create_keys()?; + let mut count = 0; + for key in auth_store.list_keys()? { + dump_keys.push_key(&key)?; + count += 1; + } + dump_keys.flush()?; + + eprintln!("Successfully dumped {count} keys!"); + + let rtxn = env.read_txn()?; + let all_tasks: Database, SerdeJson> = + try_opening_database(&env, &rtxn, "all-tasks")?; + let index_mapping: Database = + try_opening_database(&env, &rtxn, "index-mapping")?; + + if skip_enqueued_tasks { + eprintln!("Skip dumping the enqueued tasks..."); + } else { + eprintln!("Dumping the enqueued tasks..."); + + // 3. dump the tasks + let mut dump_tasks = dump.create_tasks_queue()?; + let mut count = 0; + for ret in all_tasks.iter(&rtxn)? { + let (_, t) = ret?; + let status = t.status; + let content_file = t.content_uuid(); + let mut dump_content_file = dump_tasks.push_task(&t.into())?; + + // 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. + if let Some(content_file_uuid) = content_file { + if status == Status::Enqueued { + let content_file = file_store.get_update(content_file_uuid)?; + + let reader = + DocumentsBatchReader::from_reader(content_file).with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + + let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); + while let Some(doc) = cursor.next_document().with_context(|| { + format!("While iterating on content file {:?}", content_file_uuid) + })? { + dump_content_file + .push_document(&obkv_to_object(&doc, &documents_batch_index)?)?; + } + dump_content_file.flush()?; + count += 1; + } + } + } + dump_tasks.flush()?; + + eprintln!("Successfully dumped {count} enqueued tasks!"); + } + + eprintln!("Dumping the indexes..."); + + // 4. Dump the indexes + let mut count = 0; + for result in index_mapping.iter(&rtxn)? { + let (uid, uuid) = result?; + let index_path = db_path.join("indexes").join(uuid.to_string()); + let index = Index::new(EnvOpenOptions::new(), &index_path).with_context(|| { + format!("While trying to open the index at path {:?}", index_path.display()) + })?; + + let rtxn = index.read_txn()?; + let metadata = IndexMetadata { + uid: uid.to_owned(), + primary_key: index.primary_key(&rtxn)?.map(String::from), + created_at: index.created_at(&rtxn)?, + updated_at: index.updated_at(&rtxn)?, + }; + let mut index_dumper = dump.create_index(uid, &metadata)?; + + let fields_ids_map = index.fields_ids_map(&rtxn)?; + let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); + + // 4.1. Dump the documents + for ret in index.all_documents(&rtxn)? { + let (_id, doc) = ret?; + let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?; + index_dumper.push_document(&document)?; + } + + // 4.2. Dump the settings + let settings = meilisearch_types::settings::settings(&index, &rtxn)?; + index_dumper.settings(&settings)?; + count += 1; + } + + eprintln!("Successfully dumped {count} indexes!"); + // We will not dump experimental feature settings + eprintln!("The tool is not dumping experimental features, please set them by hand afterward"); + + let dump_uid = started_at.format(format_description!( + "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" + )).unwrap(); + + let path = dump_dir.join(format!("{}.dump", dump_uid)); + let file = File::create(&path)?; + dump.persist_to(BufWriter::new(file))?; + + eprintln!("Dump exported at path {:?}", path.display()); + + Ok(()) +} diff --git a/meilitool/src/uuid_codec.rs b/meilitool/src/uuid_codec.rs new file mode 100644 index 000000000..70a92ca94 --- /dev/null +++ b/meilitool/src/uuid_codec.rs @@ -0,0 +1,24 @@ +use std::borrow::Cow; +use std::convert::TryInto; + +use meilisearch_types::heed::{BytesDecode, BytesEncode}; +use uuid::Uuid; + +/// A heed codec for value of struct Uuid. +pub struct UuidCodec; + +impl<'a> BytesDecode<'a> for UuidCodec { + type DItem = Uuid; + + fn bytes_decode(bytes: &'a [u8]) -> Option { + bytes.try_into().ok().map(Uuid::from_bytes) + } +} + +impl BytesEncode<'_> for UuidCodec { + type EItem = Uuid; + + fn bytes_encode(item: &Self::EItem) -> Option> { + Some(Cow::Borrowed(item.as_bytes())) + } +}