From e8795d2608326dff111098d64ea25b646ff4361c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sat, 14 Jun 2025 12:43:24 +0200 Subject: [PATCH] Export embeddings --- .../src/scheduler/process_export.rs | 73 ++++++++++++++++++- 1 file changed, 70 insertions(+), 3 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_export.rs b/crates/index-scheduler/src/scheduler/process_export.rs index e01ddf2e4..1686472ab 100644 --- a/crates/index-scheduler/src/scheduler/process_export.rs +++ b/crates/index-scheduler/src/scheduler/process_export.rs @@ -1,13 +1,17 @@ use std::collections::BTreeMap; +use std::sync::atomic; use std::time::Duration; use meilisearch_types::index_uid_pattern::IndexUidPattern; +use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; +use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; use meilisearch_types::milli::{obkv_to_json, Filter}; use meilisearch_types::settings::{self, SecretPolicy}; use meilisearch_types::tasks::ExportIndexSettings; use ureq::{json, Agent}; +use crate::processing::AtomicDocumentStep; use crate::{Error, IndexScheduler, Result}; impl IndexScheduler { @@ -92,19 +96,77 @@ impl IndexScheduler { .embedding_configs(&index_rtxn) .map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?; + let total_documents = universe.len() as u32; + let (step, progress_step) = AtomicDocumentStep::new(total_documents); + progress.update_progress(progress_step); + let limit = 50 * 1024 * 1024; // 50 MiB let mut buffer = Vec::new(); let mut tmp_buffer = Vec::new(); - for docid in universe { + for (i, docid) in universe.into_iter().enumerate() { let document = index .document(&index_rtxn, docid) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; - let value = obkv_to_json(&all_fields, &fields_ids_map, document) + let mut document = obkv_to_json(&all_fields, &fields_ids_map, document) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + // TODO definitely factorize this code + if !*skip_embeddings { + 'inject_vectors: { + let embeddings = index + .embeddings(&index_rtxn, docid) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + + if embeddings.is_empty() { + break 'inject_vectors; + } + + let vectors = document + .entry(RESERVED_VECTORS_FIELD_NAME) + .or_insert(serde_json::Value::Object(Default::default())); + + let serde_json::Value::Object(vectors) = vectors else { + return Err(Error::from_milli( + meilisearch_types::milli::Error::UserError( + meilisearch_types::milli::UserError::InvalidVectorsMapType { + document_id: { + if let Ok(Some(Ok(index))) = index + .external_id_of(&index_rtxn, std::iter::once(docid)) + .map(|it| it.into_iter().next()) + { + index + } else { + format!("internal docid={docid}") + } + }, + value: vectors.clone(), + }, + ), + Some(uid.to_string()), + )); + }; + + for (embedder_name, embeddings) in embeddings { + let user_provided = embedding_configs + .iter() + .find(|conf| conf.name == embedder_name) + .is_some_and(|conf| conf.user_provided.contains(docid)); + + let embeddings = ExplicitVectors { + embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors( + embeddings, + )), + regenerate: !user_provided, + }; + vectors + .insert(embedder_name, serde_json::to_value(embeddings).unwrap()); + } + } + } + tmp_buffer.clear(); - serde_json::to_writer(&mut tmp_buffer, &value) + serde_json::to_writer(&mut tmp_buffer, &document) .map_err(meilisearch_types::milli::InternalError::from) .map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?; @@ -114,9 +176,14 @@ impl IndexScheduler { buffer.clear(); } buffer.extend_from_slice(&tmp_buffer); + + if i % 100 == 0 { + step.fetch_add(100, atomic::Ordering::Relaxed); + } } post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap(); + step.store(total_documents, atomic::Ordering::Relaxed); } Ok(())