From 52d32b4ee9ddf3ed62050ac1ad0da77561406da4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 3 Sep 2024 16:08:33 +0200 Subject: [PATCH] Move the channel sender in the closure to stop the merger thread --- index-scheduler/src/batch.rs | 28 ++++++++++++++-------------- milli/src/update/new/indexer/mod.rs | 4 ++-- milli/src/update/new/merger.rs | 1 - 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index ecb44fc14..129dbec10 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -22,21 +22,20 @@ use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; use std::io::BufWriter; -use std::sync::RwLock; use dump::IndexMetadata; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; -use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; +use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::update::new::indexer::{self, guess_primary_key, DocumentChanges}; use meilisearch_types::milli::update::{ - self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, + IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, }; use meilisearch_types::milli::vector::parsed_vectors::{ ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, }; -use meilisearch_types::milli::{self, Filter, GlobalFieldsIdsMap, Object, UserError}; +use meilisearch_types::milli::{self, Filter, Object}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; @@ -1364,17 +1363,18 @@ impl IndexScheduler { indexer::index(index_wtxn, index, fields_ids_map, &pool, document_changes)?; // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); - } else if primary_key_has_been_set { - // Everything failed but we've set a primary key. - // We need to remove it. - let mut builder = - milli::update::Settings::new(index_wtxn, index, indexer_config); - builder.reset_primary_key(); - builder.execute( - |indexing_step| tracing::trace!(update = ?indexing_step), - || must_stop_processing.clone().get(), - )?; } + // else if primary_key_has_been_set { + // // Everything failed but we've set a primary key. + // // We need to remove it. + // let mut builder = + // milli::update::Settings::new(index_wtxn, index, indexer_config); + // builder.reset_primary_key(); + // builder.execute( + // |indexing_step| tracing::trace!(update = ?indexing_step), + // || must_stop_processing.clone().get(), + // )?; + // } Ok(tasks) } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 50bb5a401..1b763f5f9 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -65,7 +65,7 @@ where thread::scope(|s| { // TODO manage the errors correctly - let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { + let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { pool.in_place_scope(|_s| { let document_changes = document_changes.into_par_iter(); // word docids @@ -85,7 +85,7 @@ where })?; // TODO manage the errors correctly - let handle2 = Builder::new().name(S("indexer-merger")).spawn_scoped(s, || { + let handle2 = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || { let rtxn = index.read_txn().unwrap(); merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index) })?; diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index 89d0762f0..e07262de8 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -6,7 +6,6 @@ use super::channel::{MergerReceiver, MergerSender}; use super::KvReaderDelAdd; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; -use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{CboRoaringBitmapCodec, Index, Result}; /// TODO We must return some infos/stats