mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-02-06 18:43:28 +01:00
Create a new export documents meilitool subcommand based on v1.12
This commit is contained in:
parent
78867b6852
commit
2ea5c57871
@ -1,5 +1,5 @@
|
|||||||
use std::fs::{read_dir, read_to_string, remove_file, File};
|
use std::fs::{read_dir, read_to_string, remove_file, File};
|
||||||
use std::io::BufWriter;
|
use std::io::{BufWriter, Write as _};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
@ -12,11 +12,14 @@ use meilisearch_types::heed::types::{SerdeJson, Str};
|
|||||||
use meilisearch_types::heed::{
|
use meilisearch_types::heed::{
|
||||||
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
|
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
|
||||||
};
|
};
|
||||||
|
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
|
||||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||||
|
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
|
||||||
use meilisearch_types::milli::{obkv_to_json, BEU32};
|
use meilisearch_types::milli::{obkv_to_json, BEU32};
|
||||||
use meilisearch_types::tasks::{Status, Task};
|
use meilisearch_types::tasks::{Status, Task};
|
||||||
use meilisearch_types::versioning::{get_version, parse_version};
|
use meilisearch_types::versioning::{get_version, parse_version};
|
||||||
use meilisearch_types::Index;
|
use meilisearch_types::Index;
|
||||||
|
use serde_json::Value::Object;
|
||||||
use time::macros::format_description;
|
use time::macros::format_description;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use upgrade::OfflineUpgrade;
|
use upgrade::OfflineUpgrade;
|
||||||
@ -68,6 +71,20 @@ enum Command {
|
|||||||
skip_enqueued_tasks: bool,
|
skip_enqueued_tasks: bool,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Exports the documents of an index in NDJSON format from a Meilisearch index to stdout.
|
||||||
|
///
|
||||||
|
/// This command can be executed on a running Meilisearch database. However, please note that
|
||||||
|
/// it will maintain a read-only transaction for the duration of the extraction process.
|
||||||
|
ExportDocuments {
|
||||||
|
/// The index name to export the documents from.
|
||||||
|
#[arg(long)]
|
||||||
|
index_name: String,
|
||||||
|
|
||||||
|
/// Do not export vectors with the documents.
|
||||||
|
#[arg(long)]
|
||||||
|
ignore_vectors: bool,
|
||||||
|
},
|
||||||
|
|
||||||
/// Attempts to upgrade from one major version to the next without a dump.
|
/// Attempts to upgrade from one major version to the next without a dump.
|
||||||
///
|
///
|
||||||
/// Make sure to run this commmand when Meilisearch is not running!
|
/// Make sure to run this commmand when Meilisearch is not running!
|
||||||
@ -114,6 +131,9 @@ fn main() -> anyhow::Result<()> {
|
|||||||
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
|
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
|
||||||
export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
|
export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
|
||||||
}
|
}
|
||||||
|
Command::ExportDocuments { index_name, ignore_vectors } => {
|
||||||
|
export_documents(db_path, index_name, ignore_vectors)
|
||||||
|
}
|
||||||
Command::OfflineUpgrade { target_version } => {
|
Command::OfflineUpgrade { target_version } => {
|
||||||
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
|
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
|
||||||
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
|
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
|
||||||
@ -443,3 +463,96 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
bail!("Target index {index_name} not found!")
|
bail!("Target index {index_name} not found!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn export_documents(
|
||||||
|
db_path: PathBuf,
|
||||||
|
index_name: String,
|
||||||
|
ignore_vectors: bool,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let index_scheduler_path = db_path.join("tasks");
|
||||||
|
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
|
||||||
|
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
||||||
|
|
||||||
|
let rtxn = env.read_txn()?;
|
||||||
|
let index_mapping: Database<Str, UuidCodec> =
|
||||||
|
try_opening_database(&env, &rtxn, "index-mapping")?;
|
||||||
|
|
||||||
|
for result in index_mapping.iter(&rtxn)? {
|
||||||
|
let (uid, uuid) = result?;
|
||||||
|
if uid == index_name {
|
||||||
|
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||||
|
let index =
|
||||||
|
Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
|
||||||
|
format!("While trying to open the index at path {:?}", index_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let rtxn = index.read_txn()?;
|
||||||
|
let fields_ids_map = index.fields_ids_map(&rtxn)?;
|
||||||
|
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||||
|
let embedding_configs = index.embedding_configs(&rtxn)?;
|
||||||
|
|
||||||
|
let mut stdout = BufWriter::new(std::io::stdout());
|
||||||
|
for ret in index.all_documents(&rtxn)? {
|
||||||
|
let (id, doc) = ret?;
|
||||||
|
let mut document = obkv_to_json(&all_fields, &fields_ids_map, doc)?;
|
||||||
|
|
||||||
|
if !ignore_vectors {
|
||||||
|
'inject_vectors: {
|
||||||
|
let embeddings = index.embeddings(&rtxn, id)?;
|
||||||
|
|
||||||
|
if embeddings.is_empty() {
|
||||||
|
break 'inject_vectors;
|
||||||
|
}
|
||||||
|
|
||||||
|
let vectors = document
|
||||||
|
.entry(RESERVED_VECTORS_FIELD_NAME)
|
||||||
|
.or_insert(Object(Default::default()));
|
||||||
|
|
||||||
|
let Object(vectors) = vectors else {
|
||||||
|
return Err(meilisearch_types::milli::Error::UserError(
|
||||||
|
meilisearch_types::milli::UserError::InvalidVectorsMapType {
|
||||||
|
document_id: {
|
||||||
|
if let Ok(Some(Ok(index))) = index
|
||||||
|
.external_id_of(&rtxn, std::iter::once(id))
|
||||||
|
.map(|it| it.into_iter().next())
|
||||||
|
{
|
||||||
|
index
|
||||||
|
} else {
|
||||||
|
format!("internal docid={id}")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
value: vectors.clone(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.into());
|
||||||
|
};
|
||||||
|
|
||||||
|
for (embedder_name, embeddings) in embeddings {
|
||||||
|
let user_provided = embedding_configs
|
||||||
|
.iter()
|
||||||
|
.find(|conf| conf.name == embedder_name)
|
||||||
|
.is_some_and(|conf| conf.user_provided.contains(id));
|
||||||
|
|
||||||
|
let embeddings = ExplicitVectors {
|
||||||
|
embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors(
|
||||||
|
embeddings,
|
||||||
|
)),
|
||||||
|
regenerate: !user_provided,
|
||||||
|
};
|
||||||
|
vectors
|
||||||
|
.insert(embedder_name, serde_json::to_value(embeddings).unwrap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
serde_json::to_writer(&mut stdout, &document)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
stdout.flush()?;
|
||||||
|
} else {
|
||||||
|
eprintln!("Found index {uid} but it's not the right index...");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user