Use the new rayon-par-bridge library

This commit is contained in:
Clément Renault 2024-05-12 14:45:25 +02:00
parent b67d385cf0
commit 81ec0abad1
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 14 additions and 8 deletions

10
Cargo.lock generated
View File

@ -3510,6 +3510,7 @@ dependencies = [
"rand", "rand",
"rand_pcg", "rand_pcg",
"rayon", "rayon",
"rayon-par-bridge",
"rhai", "rhai",
"roaring", "roaring",
"rstar", "rstar",
@ -4289,6 +4290,15 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]]
name = "rayon-par-bridge"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb6a14d8f65834aca6b0fe4cbbd7a27e639cd3efb1f2a32de9942368f1991de8"
dependencies = [
"rayon",
]
[[package]] [[package]]
name = "reborrow" name = "reborrow"
version = "0.5.5" version = "0.5.5"

View File

@ -84,6 +84,7 @@ rand = "0.8.5"
tracing = "0.1.40" tracing = "0.1.40"
ureq = { version = "2.10.0", features = ["json"] } ureq = { version = "2.10.0", features = ["json"] }
url = "2.5.2" url = "2.5.2"
rayon-par-bridge = "0.1.0"
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false } mimalloc = { version = "0.1.43", default-features = false }

View File

@ -44,7 +44,7 @@ use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
}; };
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::{CboRoaringBitmapCodec, FieldsIdsMap, Index, Object, Result}; use crate::{CboRoaringBitmapCodec, Index, Object, Result};
static MERGED_DATABASE_COUNT: usize = 7; static MERGED_DATABASE_COUNT: usize = 7;
static PREFIX_DATABASE_COUNT: usize = 4; static PREFIX_DATABASE_COUNT: usize = 4;
@ -262,11 +262,8 @@ where
Ok(DocumentEdition::Nothing) as Result<_> Ok(DocumentEdition::Nothing) as Result<_>
}); });
std::thread::scope(|s| { rayon_par_bridge::par_bridge(100, processing, |iterator| {
let (send, recv) = std::sync::mpsc::sync_channel(100); for result in iterator {
s.spawn(move || processing.for_each(|el| drop(send.send(el))));
for result in recv {
if (self.should_abort)() { if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation)); return Err(Error::InternalError(InternalError::AbortedIndexation));
} }
@ -285,8 +282,6 @@ where
Ok(()) Ok(())
})?; })?;
drop(immutable_obkvs);
let file = documents_batch_builder.into_inner()?; let file = documents_batch_builder.into_inner()?;
let reader = DocumentsBatchReader::from_reader(file)?; let reader = DocumentsBatchReader::from_reader(file)?;