diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index da0f9cbeb..a85aa77ba 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -1,5 +1,5 @@ use std::fs::{read_dir, read_to_string, remove_file, File}; -use std::io::BufWriter; +use std::io::{BufWriter, Write as _}; use std::path::PathBuf; use std::time::Instant; @@ -12,11 +12,14 @@ use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{ CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, }; +use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; +use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::versioning::{get_version, parse_version}; use meilisearch_types::Index; +use serde_json::Value::Object; use time::macros::format_description; use time::OffsetDateTime; use upgrade::OfflineUpgrade; @@ -68,6 +71,20 @@ enum Command { skip_enqueued_tasks: bool, }, + /// Exports the documents of an index in NDJSON format from a Meilisearch index to stdout. + /// + /// This command can be executed on a running Meilisearch database. However, please note that + /// it will maintain a read-only transaction for the duration of the extraction process. + ExportDocuments { + /// The index name to export the documents from. + #[arg(long)] + index_name: String, + + /// Do not export vectors with the documents. + #[arg(long)] + ignore_vectors: bool, + }, + /// Attempts to upgrade from one major version to the next without a dump. /// /// Make sure to run this commmand when Meilisearch is not running! @@ -114,6 +131,9 @@ fn main() -> anyhow::Result<()> { Command::ExportADump { dump_dir, skip_enqueued_tasks } => { export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version) } + Command::ExportDocuments { index_name, ignore_vectors } => { + export_documents(db_path, index_name, ignore_vectors) + } Command::OfflineUpgrade { target_version } => { let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade() @@ -443,3 +463,96 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> { bail!("Target index {index_name} not found!") } + +fn export_documents( + db_path: PathBuf, + index_name: String, + ignore_vectors: bool, +) -> anyhow::Result<()> { + let index_scheduler_path = db_path.join("tasks"); + let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + + let rtxn = env.read_txn()?; + let index_mapping: Database = + try_opening_database(&env, &rtxn, "index-mapping")?; + + for result in index_mapping.iter(&rtxn)? { + let (uid, uuid) = result?; + if uid == index_name { + let index_path = db_path.join("indexes").join(uuid.to_string()); + let index = + Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| { + format!("While trying to open the index at path {:?}", index_path.display()) + })?; + + let rtxn = index.read_txn()?; + let fields_ids_map = index.fields_ids_map(&rtxn)?; + let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); + let embedding_configs = index.embedding_configs(&rtxn)?; + + let mut stdout = BufWriter::new(std::io::stdout()); + for ret in index.all_documents(&rtxn)? { + let (id, doc) = ret?; + let mut document = obkv_to_json(&all_fields, &fields_ids_map, doc)?; + + if !ignore_vectors { + 'inject_vectors: { + let embeddings = index.embeddings(&rtxn, id)?; + + if embeddings.is_empty() { + break 'inject_vectors; + } + + let vectors = document + .entry(RESERVED_VECTORS_FIELD_NAME) + .or_insert(Object(Default::default())); + + let Object(vectors) = vectors else { + return Err(meilisearch_types::milli::Error::UserError( + meilisearch_types::milli::UserError::InvalidVectorsMapType { + document_id: { + if let Ok(Some(Ok(index))) = index + .external_id_of(&rtxn, std::iter::once(id)) + .map(|it| it.into_iter().next()) + { + index + } else { + format!("internal docid={id}") + } + }, + value: vectors.clone(), + }, + ) + .into()); + }; + + for (embedder_name, embeddings) in embeddings { + let user_provided = embedding_configs + .iter() + .find(|conf| conf.name == embedder_name) + .is_some_and(|conf| conf.user_provided.contains(id)); + + let embeddings = ExplicitVectors { + embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors( + embeddings, + )), + regenerate: !user_provided, + }; + vectors + .insert(embedder_name, serde_json::to_value(embeddings).unwrap()); + } + } + } + + serde_json::to_writer(&mut stdout, &document)?; + } + + stdout.flush()?; + } else { + eprintln!("Found index {uid} but it's not the right index..."); + } + } + + Ok(()) +}