5351: Bring back v1.13.0 changes into main r=irevoire a=Kerollmops

This PR brings back the changes made in v1.13 into the main branch.

Co-authored-by: ManyTheFish <many@meilisearch.com>
Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
Co-authored-by: Clémentine <clementine@meilisearch.com>
Co-authored-by: meili-bors[bot] <89034592+meili-bors[bot]@users.noreply.github.com>
Co-authored-by: Tamo <tamo@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-02-18 08:05:02 +00:00 committed by GitHub
commit 0f1aeb8eaa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
101 changed files with 8351 additions and 1518 deletions

View file

@ -5,6 +5,8 @@ use std::marker::PhantomData;
use std::mem;
use std::num::NonZeroU16;
use std::ops::Range;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
use bbqueue::framed::{FrameGrantR, FrameProducer};
@ -71,12 +73,23 @@ pub fn extractor_writer_bbqueue(
consumer
});
let sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
let sender = ExtractorBbqueueSender {
sender,
producers,
max_grant,
sent_messages_attempts: sent_messages_attempts.clone(),
blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(),
};
let receiver = WriterBbqueueReceiver {
receiver,
look_at_consumer: (0..consumers.len()).cycle(),
consumers,
sent_messages_attempts,
blocking_sent_messages_attempts,
};
(sender, receiver)
}
@ -92,6 +105,12 @@ pub struct ExtractorBbqueueSender<'a> {
/// It will never be able to store more than that as the
/// buffer cannot split data into two parts.
max_grant: usize,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// messages failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
}
pub struct WriterBbqueueReceiver<'a> {
@ -104,6 +123,12 @@ pub struct WriterBbqueueReceiver<'a> {
look_at_consumer: Cycle<Range<usize>>,
/// The BBQueue frames to read when waking-up.
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// message failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
}
/// The action to perform on the receiver/writer side.
@ -169,6 +194,16 @@ impl<'a> WriterBbqueueReceiver<'a> {
}
None
}
/// Returns the total count of attempts to send messages through the BBQueue channel.
pub fn sent_messages_attempts(&self) -> usize {
self.sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
/// Returns the count of attempts to send messages that had to be paused due to BBQueue being full.
pub fn blocking_sent_messages_attempts(&self) -> usize {
self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
}
pub struct FrameWithHeader<'a> {
@ -458,10 +493,17 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
payload_header.serialize_into(grant);
Ok(())
})?;
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
payload_header.serialize_into(grant);
Ok(())
},
)?;
Ok(())
}
@ -500,20 +542,28 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
if dimensions != 0 {
let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
if dimensions != 0 {
let output_iter =
remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
}
}
}
Ok(())
})?;
Ok(())
},
)?;
Ok(())
}
@ -571,13 +621,20 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer)
})?;
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer)
},
)?;
Ok(())
}
@ -619,12 +676,19 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
key_writer(remaining)
})?;
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
key_writer(remaining)
},
)?;
Ok(())
}
@ -637,12 +701,18 @@ fn reserve_and_write_grant<F>(
producer: &mut FrameProducer,
total_length: usize,
sender: &flume::Sender<ReceiverAction>,
sent_messages_attempts: &AtomicUsize,
blocking_sent_messages_attempts: &AtomicUsize,
f: F,
) -> crate::Result<()>
where
F: FnOnce(&mut [u8]) -> crate::Result<()>,
{
loop {
// An attempt means trying multiple times
// whether is succeeded or not.
sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
for _ in 0..10_000 {
match producer.grant(total_length) {
Ok(mut grant) => {
@ -666,6 +736,10 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
// We made an attempt to send a message in the
// bbqueue channel but it didn't succeed.
blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
// We prefer to yield and allow the writing thread
// to do its job, especially beneficial when there
// is only one CPU core available.

View file

@ -144,7 +144,7 @@ impl<'doc> Update<'doc> {
)?)
}
pub fn updated(&self) -> DocumentFromVersions<'_, 'doc> {
pub fn only_changed_fields(&self) -> DocumentFromVersions<'_, 'doc> {
DocumentFromVersions::new(&self.new)
}
@ -182,7 +182,7 @@ impl<'doc> Update<'doc> {
let mut cached_current = None;
let mut updated_selected_field_count = 0;
for entry in self.updated().iter_top_level_fields() {
for entry in self.only_changed_fields().iter_top_level_fields() {
let (key, updated_value) = entry?;
if perm_json_p::select_field(key, fields, &[]) == perm_json_p::Selection::Skip {
@ -241,7 +241,7 @@ impl<'doc> Update<'doc> {
Ok(has_deleted_fields)
}
pub fn updated_vectors(
pub fn only_changed_vectors(
&self,
doc_alloc: &'doc Bump,
embedders: &'doc EmbeddingConfigs,

View file

@ -199,7 +199,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
.transpose()?;
let updated_geo = update
.updated()
.merged(rtxn, index, db_fields_ids_map)?
.geo_field()?
.map(|geo| extract_geo_coordinates(external_id, geo))
.transpose()?;

View file

@ -99,7 +99,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a, 'b> {
context.db_fields_ids_map,
&context.doc_alloc,
)?;
let new_vectors = update.updated_vectors(&context.doc_alloc, self.embedders)?;
let new_vectors =
update.only_changed_vectors(&context.doc_alloc, self.embedders)?;
if let Some(new_vectors) = &new_vectors {
unused_vectors_distribution.append(new_vectors)?;

View file

@ -234,7 +234,7 @@ where
);
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
let span = tracing::debug_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
extract(
@ -247,7 +247,7 @@ where
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors");
let span = tracing::debug_span!(target: "indexing::documents::merge", "vectors");
let _entered = span.enter();
for config in &mut index_embeddings {

View file

@ -1,5 +1,5 @@
use std::sync::atomic::AtomicBool;
use std::sync::RwLock;
use std::sync::{Once, RwLock};
use std::thread::{self, Builder};
use big_s::S;
@ -33,6 +33,8 @@ mod post_processing;
mod update_by_function;
mod write;
static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
/// This is the main function of this crate.
///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
@ -93,6 +95,15 @@ where
},
);
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();
@ -179,13 +190,16 @@ where
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
build_vectors(
index,
wtxn,
index_embeddings,
&mut arroy_writers,
&indexing_context.must_stop_processing,
)?;
pool.install(|| {
build_vectors(
index,
wtxn,
index_embeddings,
&mut arroy_writers,
&indexing_context.must_stop_processing,
)
})
.unwrap()?;
post_processing::post_process(
indexing_context,

View file

@ -72,11 +72,23 @@ pub(super) fn write_to_db(
&mut aligned_embedding,
)?;
}
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
let direct_attempts = writer_receiver.sent_messages_attempts();
let blocking_attempts = writer_receiver.blocking_sent_messages_attempts();
let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0;
tracing::debug!(
"Channel congestion metrics - \
Attempts: {direct_attempts}, \
Blocked attempts: {blocking_attempts} \
({congestion_pct:.1}% congestion)"
);
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::vectors")]
#[tracing::instrument(level = "debug", skip_all, target = "indexing::vectors")]
pub(super) fn build_vectors<MSP>(
index: &Index,
wtxn: &mut RwTxn<'_>,

View file

@ -1,7 +1,9 @@
mod v1_12;
mod v1_13;
use heed::RwTxn;
use v1_12::{V1_12_3_To_Current, V1_12_To_V1_12_3};
use v1_12::{V1_12_3_To_V1_13_0, V1_12_To_V1_12_3};
use v1_13::V1_13_0_To_Current;
use crate::progress::{Progress, VariableNameStep};
use crate::{Index, InternalError, Result};
@ -26,11 +28,13 @@ pub fn upgrade(
progress: Progress,
) -> Result<bool> {
let from = index.get_version(wtxn)?.unwrap_or(db_version);
let upgrade_functions: &[&dyn UpgradeIndex] = &[&V1_12_To_V1_12_3 {}, &V1_12_3_To_Current()];
let upgrade_functions: &[&dyn UpgradeIndex] =
&[&V1_12_To_V1_12_3 {}, &V1_12_3_To_V1_13_0 {}, &V1_13_0_To_Current()];
let start = match from {
(1, 12, 0..=2) => 0,
(1, 12, 3..) => 1,
(1, 13, 0) => 2,
// We must handle the current version in the match because in case of a failure some index may have been upgraded but not other.
(1, 13, _) => return Ok(false),
(major, minor, patch) => {

View file

@ -1,11 +1,9 @@
use heed::RwTxn;
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use super::UpgradeIndex;
use crate::progress::Progress;
use crate::{make_enum_progress, Index, Result};
use super::UpgradeIndex;
#[allow(non_camel_case_types)]
pub(super) struct V1_12_To_V1_12_3 {}
@ -33,9 +31,9 @@ impl UpgradeIndex for V1_12_To_V1_12_3 {
}
#[allow(non_camel_case_types)]
pub(super) struct V1_12_3_To_Current();
pub(super) struct V1_12_3_To_V1_13_0 {}
impl UpgradeIndex for V1_12_3_To_Current {
impl UpgradeIndex for V1_12_3_To_V1_13_0 {
fn upgrade(
&self,
_wtxn: &mut RwTxn,
@ -43,14 +41,11 @@ impl UpgradeIndex for V1_12_3_To_Current {
_original: (u32, u32, u32),
_progress: Progress,
) -> Result<bool> {
Ok(false)
// recompute the indexes stats
Ok(true)
}
fn target_version(&self) -> (u32, u32, u32) {
(
VERSION_MAJOR.parse().unwrap(),
VERSION_MINOR.parse().unwrap(),
VERSION_PATCH.parse().unwrap(),
)
(1, 13, 0)
}
}

View file

@ -0,0 +1,29 @@
use heed::RwTxn;
use super::UpgradeIndex;
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use crate::progress::Progress;
use crate::{Index, Result};
#[allow(non_camel_case_types)]
pub(super) struct V1_13_0_To_Current();
impl UpgradeIndex for V1_13_0_To_Current {
fn upgrade(
&self,
_wtxn: &mut RwTxn,
_index: &Index,
_original: (u32, u32, u32),
_progress: Progress,
) -> Result<bool> {
Ok(false)
}
fn target_version(&self) -> (u32, u32, u32) {
(
VERSION_MAJOR.parse().unwrap(),
VERSION_MINOR.parse().unwrap(),
VERSION_PATCH.parse().unwrap(),
)
}
}