diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 88a4c2f77..f82f4af37 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -76,7 +76,7 @@ where MSP: Fn() -> bool + Sync, SP: Fn(Progress) + Sync, { - /// TODO restrict memory and remove this memory from the extractors bum allocators + /// TODO restrict memory and remove this memory from the extractors bump allocators let bbbuffers: Vec<_> = (0..rayon::current_num_threads()) .map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread .collect(); @@ -100,6 +100,7 @@ where send_progress, }; + let mut index_embeddings = index.embedding_configs(wtxn)?; let mut field_distribution = index.field_distribution(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?; @@ -296,7 +297,6 @@ where 'vectors: { - let mut index_embeddings = index.embedding_configs(&rtxn)?; if index_embeddings.is_empty() { break 'vectors; } @@ -322,8 +322,6 @@ where } } } - - embedding_sender.finish(index_embeddings).unwrap(); } 'geo: { @@ -457,46 +455,47 @@ where embeddings.append(embedding).unwrap(); } - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_items(wtxn, docid, &embeddings)?; - } - ArroyOperation::SetVector { docid, embedder_id, embedding } => { - let (_, _, writer, dimensions) = arroy_writers - .get(&embedder_id) - .expect("requested a missing embedder"); - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_item(wtxn, docid, &embedding)?; - } - ArroyOperation::Finish { configs } => { - let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); - let _entered = span.enter(); - - (indexing_context.send_progress)(Progress::from_step( - Step::WritingEmbeddingsToDatabase, - )); - - for ( - _embedder_index, - (_embedder_name, _embedder, writer, dimensions), - ) in &mut arroy_writers - { - let dimensions = *dimensions; - writer.build_and_quantize( - wtxn, - &mut rng, - dimensions, - false, - &indexing_context.must_stop_processing, - )?; - } - - index.put_embedding_configs(wtxn, configs)?; - } - }, - } + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_items(wtxn, docid, &embeddings)?; + } + ArroyOperation::SetVector { docid, embedder_id, embedding } => { + let (_, _, writer, dimensions) = + arroy_writers.get(&embedder_id).expect("requested a missing embedder"); + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_item(wtxn, docid, &embedding)?; + } + _otherwise => unreachable!(), + }, } } + 'vectors: { + let span = + tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); + let _entered = span.enter(); + + if index_embeddings.is_empty() { + break 'vectors; + } + + (indexing_context.send_progress)(Progress::from_step( + Step::WritingEmbeddingsToDatabase, + )); + + for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers { + let dimensions = *dimensions; + writer.build_and_quantize( + wtxn, + &mut rng, + dimensions, + false, + &indexing_context.must_stop_processing, + )?; + } + + index.put_embedding_configs(wtxn, index_embeddings)?; + } + (indexing_context.send_progress)(Progress::from_step(Step::WaitingForExtractors)); let facet_field_ids_delta = extractor_handle.join().unwrap()?;