Move the channel sender in the closure to stop the merger thread

This commit is contained in:
Clément Renault 2024-09-03 16:08:33 +02:00
parent da61408e52
commit 52d32b4ee9
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 16 additions and 17 deletions

View File

@ -22,21 +22,20 @@ use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, File};
use std::io::BufWriter;
use std::sync::RwLock;
use dump::IndexMetadata;
use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::new::indexer::{self, guess_primary_key, DocumentChanges};
use meilisearch_types::milli::update::{
self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
};
use meilisearch_types::milli::vector::parsed_vectors::{
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
};
use meilisearch_types::milli::{self, Filter, GlobalFieldsIdsMap, Object, UserError};
use meilisearch_types::milli::{self, Filter, Object};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
@ -1364,17 +1363,18 @@ impl IndexScheduler {
indexer::index(index_wtxn, index, fields_ids_map, &pool, document_changes)?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} else if primary_key_has_been_set {
// Everything failed but we've set a primary key.
// We need to remove it.
let mut builder =
milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.reset_primary_key();
builder.execute(
|indexing_step| tracing::trace!(update = ?indexing_step),
|| must_stop_processing.clone().get(),
)?;
}
// else if primary_key_has_been_set {
// // Everything failed but we've set a primary key.
// // We need to remove it.
// let mut builder =
// milli::update::Settings::new(index_wtxn, index, indexer_config);
// builder.reset_primary_key();
// builder.execute(
// |indexing_step| tracing::trace!(update = ?indexing_step),
// || must_stop_processing.clone().get(),
// )?;
// }
Ok(tasks)
}

View File

@ -65,7 +65,7 @@ where
thread::scope(|s| {
// TODO manage the errors correctly
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || {
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.in_place_scope(|_s| {
let document_changes = document_changes.into_par_iter();
// word docids
@ -85,7 +85,7 @@ where
})?;
// TODO manage the errors correctly
let handle2 = Builder::new().name(S("indexer-merger")).spawn_scoped(s, || {
let handle2 = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || {
let rtxn = index.read_txn().unwrap();
merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index)
})?;

View File

@ -6,7 +6,6 @@ use super::channel::{MergerReceiver, MergerSender};
use super::KvReaderDelAdd;
use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Index, Result};
/// TODO We must return some infos/stats