Send large payload through crossbeam

This commit is contained in:
Clément Renault 2024-11-27 18:06:43 +01:00
parent 5c488e20cc
commit 58eab9a018
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
2 changed files with 266 additions and 36 deletions

View file

@ -16,6 +16,7 @@ use rand::SeedableRng as _;
use raw_collections::RawMap;
use time::OffsetDateTime;
pub use update_by_function::UpdateByFunction;
use {LargeEntry, LargeVector};
use super::channel::*;
use super::extract::*;
@ -40,7 +41,7 @@ use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::InnerIndexSettings;
use crate::update::{FacetsUpdateBulk, GrenadParameters};
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
use crate::vector::{ArroyWrapper, EmbeddingConfigs, Embeddings};
use crate::{
Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort,
ThreadPoolNoAbortBuilder, UserError,
@ -132,7 +133,8 @@ where
{
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
let _entered = span.enter();
extract(document_changes,
extract(
document_changes,
&document_extractor,
indexing_context,
&mut extractor_allocs,
@ -416,7 +418,7 @@ where
match action {
ReceiverAction::WakeUp => (),
ReceiverAction::LargeEntry { database, key, value } => {
ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => {
let database_name = database.database_name();
let database = database.database(index);
if let Err(error) = database.put(wtxn, &key, &value) {
@ -428,6 +430,24 @@ where
}));
}
}
ReceiverAction::LargeVector(large_vector) => {
let embedding = large_vector.read_embedding();
let LargeVector { docid, embedder_id, .. } = large_vector;
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_item(wtxn, docid, embedding)?;
}
ReceiverAction::LargeVectors(large_vectors) => {
let LargeVectors { docid, embedder_id, .. } = large_vectors;
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
writer.del_items(wtxn, *dimensions, docid)?;
let mut embeddings = Embeddings::new(*dimensions);
for embedding in large_vectors.read_embeddings() {
embeddings.push(embedding.to_vec()).unwrap();
}
}
}
// Every time the is a message in the channel we search
@ -582,6 +602,19 @@ fn write_from_bbqueue(
writer.del_items(wtxn, *dimensions, docid)?;
writer.add_item(wtxn, docid, embedding)?;
}
EntryHeader::ArroySetVectors(asvs) => {
let ArroySetVectors { docid, embedder_id, .. } = asvs;
let frame = frame_with_header.frame();
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
writer.del_items(wtxn, *dimensions, docid)?;
for index in 0.. {
match asvs.read_embedding_into_vec(frame, index, aligned_embedding) {
Some(embedding) => writer.add_item(wtxn, docid, embedding)?,
None => break,
}
}
}
}
}