mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-06-24 23:48:30 +02:00
update arroy to the latest working version
This commit is contained in:
parent
796f9fdf5b
commit
3852563e79
19
Cargo.lock
generated
19
Cargo.lock
generated
@ -444,11 +444,12 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "arroy"
|
name = "arroy"
|
||||||
version = "0.6.1"
|
version = "0.7.0"
|
||||||
source = "git+https://github.com/meilisearch/arroy.git?rev=5b748bac2c69c65a97980901b02067a3a545e357#5b748bac2c69c65a97980901b02067a3a545e357"
|
source = "git+https://github.com/meilisearch/arroy.git?rev=a63f0979b216dde10d50fdfa4fadcb2b1dea73c7#a63f0979b216dde10d50fdfa4fadcb2b1dea73c7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
|
"crossbeam",
|
||||||
"enum-iterator",
|
"enum-iterator",
|
||||||
"heed",
|
"heed",
|
||||||
"memmap2",
|
"memmap2",
|
||||||
@ -460,6 +461,7 @@ dependencies = [
|
|||||||
"roaring",
|
"roaring",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror 2.0.12",
|
"thiserror 2.0.12",
|
||||||
|
"thread_local",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -1366,6 +1368,19 @@ dependencies = [
|
|||||||
"itertools 0.10.5",
|
"itertools 0.10.5",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crossbeam"
|
||||||
|
version = "0.8.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
|
||||||
|
dependencies = [
|
||||||
|
"crossbeam-channel",
|
||||||
|
"crossbeam-deque",
|
||||||
|
"crossbeam-epoch",
|
||||||
|
"crossbeam-queue",
|
||||||
|
"crossbeam-utils",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crossbeam-channel"
|
name = "crossbeam-channel"
|
||||||
version = "0.5.15"
|
version = "0.5.15"
|
||||||
|
@ -237,7 +237,7 @@ impl IndexScheduler {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded);
|
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded);
|
||||||
|
|
||||||
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32);
|
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u64);
|
||||||
progress.update_progress(task_progress_obj);
|
progress.update_progress(task_progress_obj);
|
||||||
process_batch_info = info;
|
process_batch_info = info;
|
||||||
let mut success = 0;
|
let mut success = 0;
|
||||||
@ -316,7 +316,7 @@ impl IndexScheduler {
|
|||||||
Err(err) => {
|
Err(err) => {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchFailed);
|
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchFailed);
|
||||||
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32);
|
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u64);
|
||||||
progress.update_progress(task_progress_obj);
|
progress.update_progress(task_progress_obj);
|
||||||
|
|
||||||
if matches!(err, Error::DatabaseUpgrade(_)) {
|
if matches!(err, Error::DatabaseUpgrade(_)) {
|
||||||
|
@ -346,8 +346,8 @@ impl IndexScheduler {
|
|||||||
for (step, swap) in swaps.iter().enumerate() {
|
for (step, swap) in swaps.iter().enumerate() {
|
||||||
progress.update_progress(VariableNameStep::<SwappingTheIndexes>::new(
|
progress.update_progress(VariableNameStep::<SwappingTheIndexes>::new(
|
||||||
format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1),
|
format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1),
|
||||||
step as u32,
|
step as u64,
|
||||||
swaps.len() as u32,
|
swaps.len() as u64,
|
||||||
));
|
));
|
||||||
self.apply_index_swap(
|
self.apply_index_swap(
|
||||||
&mut wtxn,
|
&mut wtxn,
|
||||||
@ -425,7 +425,7 @@ impl IndexScheduler {
|
|||||||
// 3. before_name -> new_name in the task's KindWithContent
|
// 3. before_name -> new_name in the task's KindWithContent
|
||||||
progress.update_progress(InnerSwappingTwoIndexes::UpdateTheTasks);
|
progress.update_progress(InnerSwappingTwoIndexes::UpdateTheTasks);
|
||||||
let tasks_to_update = &index_lhs_task_ids | &index_rhs_task_ids;
|
let tasks_to_update = &index_lhs_task_ids | &index_rhs_task_ids;
|
||||||
let (atomic, task_progress) = AtomicTaskStep::new(tasks_to_update.len() as u32);
|
let (atomic, task_progress) = AtomicTaskStep::new(tasks_to_update.len() as u64);
|
||||||
progress.update_progress(task_progress);
|
progress.update_progress(task_progress);
|
||||||
|
|
||||||
for task_id in tasks_to_update {
|
for task_id in tasks_to_update {
|
||||||
@ -482,7 +482,7 @@ impl IndexScheduler {
|
|||||||
// The tasks that have been removed *per batches*.
|
// The tasks that have been removed *per batches*.
|
||||||
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
|
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
|
||||||
|
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u64);
|
||||||
progress.update_progress(task_progress);
|
progress.update_progress(task_progress);
|
||||||
for task_id in to_delete_tasks.iter() {
|
for task_id in to_delete_tasks.iter() {
|
||||||
let task =
|
let task =
|
||||||
@ -528,7 +528,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
||||||
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u64,
|
||||||
);
|
);
|
||||||
progress.update_progress(task_progress);
|
progress.update_progress(task_progress);
|
||||||
for index in affected_indexes.iter() {
|
for index in affected_indexes.iter() {
|
||||||
@ -547,7 +547,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u64);
|
||||||
progress.update_progress(task_progress);
|
progress.update_progress(task_progress);
|
||||||
for task in to_delete_tasks.iter() {
|
for task in to_delete_tasks.iter() {
|
||||||
self.queue.tasks.all_tasks.delete(wtxn, &task)?;
|
self.queue.tasks.all_tasks.delete(wtxn, &task)?;
|
||||||
@ -564,7 +564,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
||||||
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
|
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u64);
|
||||||
progress.update_progress(batch_progress);
|
progress.update_progress(batch_progress);
|
||||||
for (batch_id, to_delete_tasks) in affected_batches {
|
for (batch_id, to_delete_tasks) in affected_batches {
|
||||||
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
|
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
|
||||||
@ -737,7 +737,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 3. We now have a list of tasks to cancel, cancel them
|
// 3. We now have a list of tasks to cancel, cancel them
|
||||||
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
|
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u64);
|
||||||
progress.update_progress(progress_obj);
|
progress.update_progress(progress_obj);
|
||||||
|
|
||||||
let mut tasks = self.queue.tasks.get_existing_tasks(
|
let mut tasks = self.queue.tasks.get_existing_tasks(
|
||||||
@ -748,7 +748,7 @@ impl IndexScheduler {
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
progress.update_progress(TaskCancelationProgress::UpdatingTasks);
|
progress.update_progress(TaskCancelationProgress::UpdatingTasks);
|
||||||
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
|
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u64);
|
||||||
progress.update_progress(progress_obj);
|
progress.update_progress(progress_obj);
|
||||||
for task in tasks.iter_mut() {
|
for task in tasks.iter_mut() {
|
||||||
task.status = Status::Canceled;
|
task.status = Status::Canceled;
|
||||||
|
@ -48,7 +48,7 @@ impl IndexScheduler {
|
|||||||
let mut dump_tasks = dump.create_tasks_queue()?;
|
let mut dump_tasks = dump.create_tasks_queue()?;
|
||||||
|
|
||||||
let (atomic, update_task_progress) =
|
let (atomic, update_task_progress) =
|
||||||
AtomicTaskStep::new(self.queue.tasks.all_tasks.len(&rtxn)? as u32);
|
AtomicTaskStep::new(self.queue.tasks.all_tasks.len(&rtxn)? as u64);
|
||||||
progress.update_progress(update_task_progress);
|
progress.update_progress(update_task_progress);
|
||||||
|
|
||||||
for ret in self.queue.tasks.all_tasks.iter(&rtxn)? {
|
for ret in self.queue.tasks.all_tasks.iter(&rtxn)? {
|
||||||
@ -110,7 +110,7 @@ impl IndexScheduler {
|
|||||||
let mut dump_batches = dump.create_batches_queue()?;
|
let mut dump_batches = dump.create_batches_queue()?;
|
||||||
|
|
||||||
let (atomic_batch_progress, update_batch_progress) =
|
let (atomic_batch_progress, update_batch_progress) =
|
||||||
AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u32);
|
AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u64);
|
||||||
progress.update_progress(update_batch_progress);
|
progress.update_progress(update_batch_progress);
|
||||||
|
|
||||||
for ret in self.queue.batches.all_batches.iter(&rtxn)? {
|
for ret in self.queue.batches.all_batches.iter(&rtxn)? {
|
||||||
@ -140,7 +140,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
// 4. Dump the indexes
|
// 4. Dump the indexes
|
||||||
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
|
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
|
||||||
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;
|
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u64;
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
let () = self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
|
let () = self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
|
||||||
progress.update_progress(VariableNameStep::<DumpCreationProgress>::new(
|
progress.update_progress(VariableNameStep::<DumpCreationProgress>::new(
|
||||||
@ -172,7 +172,7 @@ impl IndexScheduler {
|
|||||||
let nb_documents = index
|
let nb_documents = index
|
||||||
.number_of_documents(&rtxn)
|
.number_of_documents(&rtxn)
|
||||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?
|
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?
|
||||||
as u32;
|
as u64;
|
||||||
let (atomic, update_document_progress) = AtomicDocumentStep::new(nb_documents);
|
let (atomic, update_document_progress) = AtomicDocumentStep::new(nb_documents);
|
||||||
progress.update_progress(update_document_progress);
|
progress.update_progress(update_document_progress);
|
||||||
let documents = index
|
let documents = index
|
||||||
|
@ -58,7 +58,7 @@ impl IndexScheduler {
|
|||||||
// 2.4 Only copy the update files of the enqueued tasks
|
// 2.4 Only copy the update files of the enqueued tasks
|
||||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
||||||
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||||
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u64);
|
||||||
progress.update_progress(update_file_progress);
|
progress.update_progress(update_file_progress);
|
||||||
for task_id in enqueued {
|
for task_id in enqueued {
|
||||||
let task =
|
let task =
|
||||||
@ -74,12 +74,12 @@ impl IndexScheduler {
|
|||||||
// 3. Snapshot every indexes
|
// 3. Snapshot every indexes
|
||||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
||||||
let index_mapping = self.index_mapper.index_mapping;
|
let index_mapping = self.index_mapper.index_mapping;
|
||||||
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
let nb_indexes = index_mapping.len(&rtxn)? as u64;
|
||||||
|
|
||||||
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
||||||
let (name, uuid) = result?;
|
let (name, uuid) = result?;
|
||||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||||
name, i as u32, nb_indexes,
|
name, i as u64, nb_indexes,
|
||||||
));
|
));
|
||||||
let index = self.index_mapper.index(&rtxn, name)?;
|
let index = self.index_mapper.index(&rtxn, name)?;
|
||||||
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
|
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
|
||||||
|
@ -22,8 +22,8 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
|
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
|
||||||
format!("Upgrading index `{uid}`"),
|
format!("Upgrading index `{uid}`"),
|
||||||
i as u32,
|
i as u64,
|
||||||
indexes.len() as u32,
|
indexes.len() as u64,
|
||||||
));
|
));
|
||||||
let index = self.index(uid)?;
|
let index = self.index(uid)?;
|
||||||
let mut index_wtxn = index.write_txn()?;
|
let mut index_wtxn = index.write_txn()?;
|
||||||
@ -65,8 +65,8 @@ impl IndexScheduler {
|
|||||||
for (i, uid) in indexes.iter().enumerate() {
|
for (i, uid) in indexes.iter().enumerate() {
|
||||||
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
|
progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
|
||||||
format!("Rollbacking index `{uid}`"),
|
format!("Rollbacking index `{uid}`"),
|
||||||
i as u32,
|
i as u64,
|
||||||
indexes.len() as u32,
|
indexes.len() as u64,
|
||||||
));
|
));
|
||||||
let index_schd_rtxn = self.env.read_txn()?;
|
let index_schd_rtxn = self.env.read_txn()?;
|
||||||
|
|
||||||
|
@ -162,8 +162,8 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
let (uid, uuid) = result?;
|
let (uid, uuid) = result?;
|
||||||
progress.update_progress(VariableNameStep::new(
|
progress.update_progress(VariableNameStep::new(
|
||||||
&uid,
|
&uid,
|
||||||
index_index as u32,
|
index_index as u64,
|
||||||
index_count as u32,
|
index_count as u64,
|
||||||
));
|
));
|
||||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||||
|
|
||||||
@ -220,12 +220,12 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
pub struct VariableNameStep {
|
pub struct VariableNameStep {
|
||||||
name: String,
|
name: String,
|
||||||
current: u32,
|
current: u64,
|
||||||
total: u32,
|
total: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VariableNameStep {
|
impl VariableNameStep {
|
||||||
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
|
pub fn new(name: impl Into<String>, current: u64, total: u64) -> Self {
|
||||||
Self { name: name.into(), current, total }
|
Self { name: name.into(), current, total }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -235,11 +235,11 @@ impl Step for VariableNameStep {
|
|||||||
self.name.clone().into()
|
self.name.clone().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current(&self) -> u32 {
|
fn current(&self) -> u64 {
|
||||||
self.current
|
self.current
|
||||||
}
|
}
|
||||||
|
|
||||||
fn total(&self) -> u32 {
|
fn total(&self) -> u64 {
|
||||||
self.total
|
self.total
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,7 +88,7 @@ rhai = { version = "1.22.2", features = [
|
|||||||
"sync",
|
"sync",
|
||||||
] }
|
] }
|
||||||
# arroy = "0.6.1"
|
# arroy = "0.6.1"
|
||||||
arroy = { git = "https://github.com/meilisearch/arroy.git", rev = "5b748bac2c69c65a97980901b02067a3a545e357" } # incremental update
|
arroy = { git = "https://github.com/meilisearch/arroy.git", rev = "a63f0979b216dde10d50fdfa4fadcb2b1dea73c7" } # incremental update
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
tracing = "0.1.41"
|
tracing = "0.1.41"
|
||||||
ureq = { version = "2.12.1", features = ["json"] }
|
ureq = { version = "2.12.1", features = ["json"] }
|
||||||
|
@ -409,18 +409,19 @@ impl From<arroy::Error> for Error {
|
|||||||
arroy::Error::Heed(heed) => heed.into(),
|
arroy::Error::Heed(heed) => heed.into(),
|
||||||
arroy::Error::Io(io) => io.into(),
|
arroy::Error::Io(io) => io.into(),
|
||||||
arroy::Error::InvalidVecDimension { expected, received } => {
|
arroy::Error::InvalidVecDimension { expected, received } => {
|
||||||
Error::UserError(UserError::InvalidVectorDimensions { expected, found: received })
|
Error::UserError(UserError::InvalidVectorDimensions { expected, found: received })
|
||||||
}
|
}
|
||||||
arroy::Error::BuildCancelled => Error::InternalError(InternalError::AbortedIndexation),
|
arroy::Error::BuildCancelled => Error::InternalError(InternalError::AbortedIndexation),
|
||||||
arroy::Error::DatabaseFull
|
arroy::Error::DatabaseFull
|
||||||
| arroy::Error::InvalidItemAppend
|
| arroy::Error::InvalidItemAppend
|
||||||
| arroy::Error::UnmatchingDistance { .. }
|
| arroy::Error::UnmatchingDistance { .. }
|
||||||
| arroy::Error::NeedBuild(_)
|
| arroy::Error::NeedBuild(_)
|
||||||
| arroy::Error::MissingKey { .. }
|
| arroy::Error::MissingKey { .. }
|
||||||
| arroy::Error::MissingMetadata(_)
|
| arroy::Error::MissingMetadata(_)
|
||||||
| arroy::Error::CannotDecodeKeyMode { .. } => {
|
| arroy::Error::CannotDecodeKeyMode { .. }
|
||||||
Error::InternalError(InternalError::ArroyError(value))
|
| arroy::Error::UnknownVersion { .. } => {
|
||||||
}
|
Error::InternalError(InternalError::ArroyError(value))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use std::any::TypeId;
|
use std::any::TypeId;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::atomic::{AtomicU32, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
@ -13,8 +13,8 @@ use utoipa::ToSchema;
|
|||||||
|
|
||||||
pub trait Step: 'static + Send + Sync {
|
pub trait Step: 'static + Send + Sync {
|
||||||
fn name(&self) -> Cow<'static, str>;
|
fn name(&self) -> Cow<'static, str>;
|
||||||
fn current(&self) -> u32;
|
fn current(&self) -> u64;
|
||||||
fn total(&self) -> u32;
|
fn total(&self) -> u64;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
@ -113,13 +113,13 @@ pub trait NamedStep: 'static + Send + Sync + Default {
|
|||||||
/// - The total number of steps doesn't change
|
/// - The total number of steps doesn't change
|
||||||
pub struct AtomicSubStep<Name: NamedStep> {
|
pub struct AtomicSubStep<Name: NamedStep> {
|
||||||
unit_name: Name,
|
unit_name: Name,
|
||||||
current: Arc<AtomicU32>,
|
current: Arc<AtomicU64>,
|
||||||
total: u32,
|
total: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Name: NamedStep> AtomicSubStep<Name> {
|
impl<Name: NamedStep> AtomicSubStep<Name> {
|
||||||
pub fn new(total: u32) -> (Arc<AtomicU32>, Self) {
|
pub fn new(total: u64) -> (Arc<AtomicU64>, Self) {
|
||||||
let current = Arc::new(AtomicU32::new(0));
|
let current = Arc::new(AtomicU64::new(0));
|
||||||
(current.clone(), Self { current, total, unit_name: Name::default() })
|
(current.clone(), Self { current, total, unit_name: Name::default() })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -129,11 +129,11 @@ impl<Name: NamedStep> Step for AtomicSubStep<Name> {
|
|||||||
self.unit_name.name().into()
|
self.unit_name.name().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current(&self) -> u32 {
|
fn current(&self) -> u64 {
|
||||||
self.current.load(Ordering::Relaxed)
|
self.current.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn total(&self) -> u32 {
|
fn total(&self) -> u64 {
|
||||||
self.total
|
self.total
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -164,13 +164,13 @@ macro_rules! make_enum_progress {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current(&self) -> u32 {
|
fn current(&self) -> u64 {
|
||||||
*self as u32
|
*self as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
fn total(&self) -> u32 {
|
fn total(&self) -> u64 {
|
||||||
use $crate::progress::_private_enum_iterator::Sequence;
|
use $crate::progress::_private_enum_iterator::Sequence;
|
||||||
Self::CARDINALITY as u32
|
Self::CARDINALITY as u64
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -216,8 +216,8 @@ pub struct ProgressView {
|
|||||||
#[schema(rename_all = "camelCase")]
|
#[schema(rename_all = "camelCase")]
|
||||||
pub struct ProgressStepView {
|
pub struct ProgressStepView {
|
||||||
pub current_step: Cow<'static, str>,
|
pub current_step: Cow<'static, str>,
|
||||||
pub finished: u32,
|
pub finished: u64,
|
||||||
pub total: u32,
|
pub total: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Used when the name can change but it's still the same step.
|
/// Used when the name can change but it's still the same step.
|
||||||
@ -233,13 +233,13 @@ pub struct ProgressStepView {
|
|||||||
/// ```
|
/// ```
|
||||||
pub struct VariableNameStep<U: Send + Sync + 'static> {
|
pub struct VariableNameStep<U: Send + Sync + 'static> {
|
||||||
name: String,
|
name: String,
|
||||||
current: u32,
|
current: u64,
|
||||||
total: u32,
|
total: u64,
|
||||||
phantom: PhantomData<U>,
|
phantom: PhantomData<U>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<U: Send + Sync + 'static> VariableNameStep<U> {
|
impl<U: Send + Sync + 'static> VariableNameStep<U> {
|
||||||
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
|
pub fn new(name: impl Into<String>, current: u64, total: u64) -> Self {
|
||||||
Self { name: name.into(), current, total, phantom: PhantomData }
|
Self { name: name.into(), current, total, phantom: PhantomData }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -249,11 +249,11 @@ impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
|
|||||||
self.name.clone().into()
|
self.name.clone().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current(&self) -> u32 {
|
fn current(&self) -> u64 {
|
||||||
self.current
|
self.current
|
||||||
}
|
}
|
||||||
|
|
||||||
fn total(&self) -> u32 {
|
fn total(&self) -> u64 {
|
||||||
self.total
|
self.total
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -263,8 +263,8 @@ impl Step for arroy::MainStep {
|
|||||||
match self {
|
match self {
|
||||||
arroy::MainStep::PreProcessingTheItems => "pre processing the items",
|
arroy::MainStep::PreProcessingTheItems => "pre processing the items",
|
||||||
arroy::MainStep::WritingTheDescendantsAndMetadata => {
|
arroy::MainStep::WritingTheDescendantsAndMetadata => {
|
||||||
"writing the descendants and metadata"
|
"writing the descendants and metadata"
|
||||||
}
|
}
|
||||||
arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
|
arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
|
||||||
arroy::MainStep::WriteTheMetadata => "write the metadata",
|
arroy::MainStep::WriteTheMetadata => "write the metadata",
|
||||||
arroy::MainStep::RetrievingTheItemsIds => "retrieving the items ids",
|
arroy::MainStep::RetrievingTheItemsIds => "retrieving the items ids",
|
||||||
@ -272,19 +272,20 @@ impl Step for arroy::MainStep {
|
|||||||
arroy::MainStep::DeletingExtraTrees => "deleting extra trees",
|
arroy::MainStep::DeletingExtraTrees => "deleting extra trees",
|
||||||
arroy::MainStep::RemoveItemsFromExistingTrees => "remove items from existing trees",
|
arroy::MainStep::RemoveItemsFromExistingTrees => "remove items from existing trees",
|
||||||
arroy::MainStep::InsertItemsInCurrentTrees => "insert items in current trees",
|
arroy::MainStep::InsertItemsInCurrentTrees => "insert items in current trees",
|
||||||
arroy::MainStep::IncrementalIndexLargeDescendants => {
|
arroy::MainStep::RetrievingTheItems => "retrieving the items",
|
||||||
"incremental index large descendants"
|
arroy::MainStep::RetrievingTheTreeNodes => "retrieving the tree nodes",
|
||||||
}
|
arroy::MainStep::RetrieveTheLargeDescendants => "retrieve the large descendants",
|
||||||
|
arroy::MainStep::CreateTreesForItems => "create trees for items",
|
||||||
}
|
}
|
||||||
.into()
|
.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current(&self) -> u32 {
|
fn current(&self) -> u64 {
|
||||||
*self as u32
|
*self as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
fn total(&self) -> u32 {
|
fn total(&self) -> u64 {
|
||||||
Self::CARDINALITY as u32
|
Self::CARDINALITY as u64
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -293,11 +294,11 @@ impl Step for arroy::SubStep {
|
|||||||
self.unit.into()
|
self.unit.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current(&self) -> u32 {
|
fn current(&self) -> u64 {
|
||||||
self.current.load(Ordering::Relaxed)
|
self.current.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn total(&self) -> u32 {
|
fn total(&self) -> u64 {
|
||||||
self.max
|
self.max
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -217,7 +217,7 @@ where
|
|||||||
extractor_alloc.0.reset();
|
extractor_alloc.0.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
let total_documents = document_changes.len() as u32;
|
let total_documents = document_changes.len() as u64;
|
||||||
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
|
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
|
||||||
progress.update_progress(progress_step);
|
progress.update_progress(progress_step);
|
||||||
|
|
||||||
@ -249,7 +249,7 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
let res = extractor.process(changes, context).map_err(Arc::new);
|
let res = extractor.process(changes, context).map_err(Arc::new);
|
||||||
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
|
step.fetch_add(items.as_ref().len() as u64, Ordering::Relaxed);
|
||||||
|
|
||||||
// send back the doc_alloc in the pool
|
// send back the doc_alloc in the pool
|
||||||
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
|
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
|
||||||
|
@ -85,14 +85,14 @@ impl<'pl> DocumentOperation<'pl> {
|
|||||||
let mut primary_key = None;
|
let mut primary_key = None;
|
||||||
|
|
||||||
let payload_count = operations.len();
|
let payload_count = operations.len();
|
||||||
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32);
|
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u64);
|
||||||
progress.update_progress(progress_step);
|
progress.update_progress(progress_step);
|
||||||
|
|
||||||
for (payload_index, operation) in operations.into_iter().enumerate() {
|
for (payload_index, operation) in operations.into_iter().enumerate() {
|
||||||
if must_stop_processing() {
|
if must_stop_processing() {
|
||||||
return Err(InternalError::AbortedIndexation.into());
|
return Err(InternalError::AbortedIndexation.into());
|
||||||
}
|
}
|
||||||
step.store(payload_index as u32, Ordering::Relaxed);
|
step.store(payload_index as u64, Ordering::Relaxed);
|
||||||
|
|
||||||
let mut bytes = 0;
|
let mut bytes = 0;
|
||||||
let result = match operation {
|
let result = match operation {
|
||||||
@ -145,7 +145,7 @@ impl<'pl> DocumentOperation<'pl> {
|
|||||||
};
|
};
|
||||||
operations_stats.push(PayloadStats { document_count, bytes, error });
|
operations_stats.push(PayloadStats { document_count, bytes, error });
|
||||||
}
|
}
|
||||||
step.store(payload_count as u32, Ordering::Relaxed);
|
step.store(payload_count as u64, Ordering::Relaxed);
|
||||||
|
|
||||||
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
|
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
|
||||||
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =
|
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =
|
||||||
|
@ -11,7 +11,7 @@ pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progre
|
|||||||
let field_id_map = index.fields_ids_map(wtxn)?;
|
let field_id_map = index.fields_ids_map(wtxn)?;
|
||||||
|
|
||||||
let (update_document_count, sub_step) =
|
let (update_document_count, sub_step) =
|
||||||
AtomicSubStep::<progress::Document>::new(document_count as u32);
|
AtomicSubStep::<progress::Document>::new(document_count as u64);
|
||||||
progress.update_progress(sub_step);
|
progress.update_progress(sub_step);
|
||||||
|
|
||||||
let docids = index.documents_ids(wtxn)?;
|
let docids = index.documents_ids(wtxn)?;
|
||||||
|
@ -81,8 +81,8 @@ where
|
|||||||
target.1,
|
target.1,
|
||||||
target.2
|
target.2
|
||||||
),
|
),
|
||||||
i as u32,
|
i as u64,
|
||||||
upgrade_path.len() as u32,
|
upgrade_path.len() as u64,
|
||||||
));
|
));
|
||||||
regenerate_stats |= upgrade.upgrade(wtxn, index, from, progress.clone())?;
|
regenerate_stats |= upgrade.upgrade(wtxn, index, from, progress.clone())?;
|
||||||
index.put_version(wtxn, target)?;
|
index.put_version(wtxn, target)?;
|
||||||
|
@ -83,7 +83,7 @@ impl ArroyWrapper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng>(
|
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng + Send + Sync>(
|
||||||
&mut self,
|
&mut self,
|
||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
progress: &Progress,
|
progress: &Progress,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user