Export embeddings

This commit is contained in:
Clément Renault 2025-06-14 12:43:24 +02:00
parent 23e25a437c
commit 87cf0970d1
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1,13 +1,17 @@
use std::collections::BTreeMap;
use std::sync::atomic;
use std::time::Duration;
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use meilisearch_types::milli::{obkv_to_json, Filter};
use meilisearch_types::settings::{self, SecretPolicy};
use meilisearch_types::tasks::ExportIndexSettings;
use ureq::{json, Agent};
use crate::processing::AtomicDocumentStep;
use crate::{Error, IndexScheduler, Result};
impl IndexScheduler {
@ -92,19 +96,77 @@ impl IndexScheduler {
.embedding_configs(&index_rtxn)
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
let total_documents = universe.len() as u32;
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
progress.update_progress(progress_step);
let limit = 50 * 1024 * 1024; // 50 MiB
let mut buffer = Vec::new();
let mut tmp_buffer = Vec::new();
for docid in universe {
for (i, docid) in universe.into_iter().enumerate() {
let document = index
.document(&index_rtxn, docid)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
let value = obkv_to_json(&all_fields, &fields_ids_map, document)
let mut document = obkv_to_json(&all_fields, &fields_ids_map, document)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
// TODO definitely factorize this code
if !*skip_embeddings {
'inject_vectors: {
let embeddings = index
.embeddings(&index_rtxn, docid)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
if embeddings.is_empty() {
break 'inject_vectors;
}
let vectors = document
.entry(RESERVED_VECTORS_FIELD_NAME)
.or_insert(serde_json::Value::Object(Default::default()));
let serde_json::Value::Object(vectors) = vectors else {
return Err(Error::from_milli(
meilisearch_types::milli::Error::UserError(
meilisearch_types::milli::UserError::InvalidVectorsMapType {
document_id: {
if let Ok(Some(Ok(index))) = index
.external_id_of(&index_rtxn, std::iter::once(docid))
.map(|it| it.into_iter().next())
{
index
} else {
format!("internal docid={docid}")
}
},
value: vectors.clone(),
},
),
Some(uid.to_string()),
));
};
for (embedder_name, embeddings) in embeddings {
let user_provided = embedding_configs
.iter()
.find(|conf| conf.name == embedder_name)
.is_some_and(|conf| conf.user_provided.contains(docid));
let embeddings = ExplicitVectors {
embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors(
embeddings,
)),
regenerate: !user_provided,
};
vectors
.insert(embedder_name, serde_json::to_value(embeddings).unwrap());
}
}
}
tmp_buffer.clear();
serde_json::to_writer(&mut tmp_buffer, &value)
serde_json::to_writer(&mut tmp_buffer, &document)
.map_err(meilisearch_types::milli::InternalError::from)
.map_err(|e| Error::from_milli(e.into(), Some(uid.to_string())))?;
@ -114,9 +176,14 @@ impl IndexScheduler {
buffer.clear();
}
buffer.extend_from_slice(&tmp_buffer);
if i % 100 == 0 {
step.fetch_add(100, atomic::Ordering::Relaxed);
}
}
post_serialized_documents(&agent, url, uid, api_key, &buffer).unwrap();
step.store(total_documents, atomic::Ordering::Relaxed);
}
Ok(())