mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-04 18:45:46 +01:00
make the tests compile again
This commit is contained in:
parent
0c68b9ed4c
commit
966cbdab69
@ -340,7 +340,8 @@ mod tests {
|
|||||||
impl IndexMapper {
|
impl IndexMapper {
|
||||||
fn test() -> (Self, Env, IndexSchedulerHandle) {
|
fn test() -> (Self, Env, IndexSchedulerHandle) {
|
||||||
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
|
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
|
||||||
(index_scheduler.index_mapper, index_scheduler.env, handle)
|
let index_scheduler = index_scheduler.inner();
|
||||||
|
(index_scheduler.index_mapper.clone(), index_scheduler.env.clone(), handle)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
|
use std::ops::Deref;
|
||||||
|
|
||||||
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
||||||
use meilisearch_types::heed::{Database, RoTxn};
|
use meilisearch_types::heed::{Database, RoTxn};
|
||||||
@ -8,12 +9,13 @@ use meilisearch_types::tasks::{Details, Task};
|
|||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use crate::index_mapper::IndexMapper;
|
use crate::index_mapper::IndexMapper;
|
||||||
use crate::{IndexScheduler, Kind, Status, BEI128};
|
use crate::{IndexScheduler, IndexSchedulerInner, Kind, Status, BEI128};
|
||||||
|
|
||||||
pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||||
scheduler.assert_internally_consistent();
|
scheduler.assert_internally_consistent();
|
||||||
|
|
||||||
let IndexScheduler {
|
let inner = scheduler.inner();
|
||||||
|
let IndexSchedulerInner {
|
||||||
autobatching_enabled,
|
autobatching_enabled,
|
||||||
must_stop_processing: _,
|
must_stop_processing: _,
|
||||||
processing_tasks,
|
processing_tasks,
|
||||||
@ -39,13 +41,13 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
|||||||
planned_failures: _,
|
planned_failures: _,
|
||||||
run_loop_iteration: _,
|
run_loop_iteration: _,
|
||||||
zookeeper: _,
|
zookeeper: _,
|
||||||
} = scheduler;
|
} = inner.deref();
|
||||||
|
|
||||||
let rtxn = env.read_txn().unwrap();
|
let rtxn = env.read_txn().unwrap();
|
||||||
|
|
||||||
let mut snap = String::new();
|
let mut snap = String::new();
|
||||||
|
|
||||||
let processing_tasks = processing_tasks.read().unwrap().processing.clone();
|
let processing_tasks = processing_tasks.read().processing.clone();
|
||||||
snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n"));
|
snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n"));
|
||||||
snap.push_str("### Processing Tasks:\n");
|
snap.push_str("### Processing Tasks:\n");
|
||||||
snap.push_str(&snapshot_bitmap(&processing_tasks));
|
snap.push_str(&snapshot_bitmap(&processing_tasks));
|
||||||
|
@ -355,7 +355,7 @@ impl IndexScheduler {
|
|||||||
/// only once per index scheduler.
|
/// only once per index scheduler.
|
||||||
fn run(&self) {
|
fn run(&self) {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.breakpoint(Breakpoint::Init);
|
self.inner().breakpoint(Breakpoint::Init);
|
||||||
|
|
||||||
let latch = match self.zookeeper {
|
let latch = match self.zookeeper {
|
||||||
Some(ref zookeeper) => {
|
Some(ref zookeeper) => {
|
||||||
@ -891,30 +891,6 @@ impl IndexScheduler {
|
|||||||
pub fn features(&self) -> Result<RoFeatures> {
|
pub fn features(&self) -> Result<RoFeatures> {
|
||||||
self.inner().features()
|
self.inner().features()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Blocks the thread until the test handle asks to progress to/through this breakpoint.
|
|
||||||
///
|
|
||||||
/// Two messages are sent through the channel for each breakpoint.
|
|
||||||
/// The first message is `(b, false)` and the second message is `(b, true)`.
|
|
||||||
///
|
|
||||||
/// Since the channel has a capacity of zero, the `send` and `recv` calls wait for each other.
|
|
||||||
/// So when the index scheduler calls `test_breakpoint_sdr.send(b, false)`, it blocks
|
|
||||||
/// the thread until the test catches up by calling `test_breakpoint_rcv.recv()` enough.
|
|
||||||
/// From the test side, we call `recv()` repeatedly until we find the message `(breakpoint, false)`.
|
|
||||||
/// As soon as we find it, the index scheduler is unblocked but then wait again on the call to
|
|
||||||
/// `test_breakpoint_sdr.send(b, true)`. This message will only be able to send once the
|
|
||||||
/// test asks to progress to the next `(b2, false)`.
|
|
||||||
#[cfg(test)]
|
|
||||||
fn breakpoint(&self, b: Breakpoint) {
|
|
||||||
// We send two messages. The first one will sync with the call
|
|
||||||
// to `handle.wait_until(b)`. The second one will block until the
|
|
||||||
// the next call to `handle.wait_until(..)`.
|
|
||||||
self.test_breakpoint_sdr.send((b, false)).unwrap();
|
|
||||||
// This one will only be able to be sent if the test handle stays alive.
|
|
||||||
// If it fails, then it means that we have exited the test.
|
|
||||||
// By crashing with `unwrap`, we kill the run loop.
|
|
||||||
self.test_breakpoint_sdr.send((b, true)).unwrap();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is the internal structure that keeps the indexes alive.
|
/// This is the internal structure that keeps the indexes alive.
|
||||||
@ -1187,7 +1163,7 @@ impl IndexSchedulerInner {
|
|||||||
fn tick(&self) -> Result<TickOutcome> {
|
fn tick(&self) -> Result<TickOutcome> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
{
|
{
|
||||||
*self.run_loop_iteration.write().unwrap() += 1;
|
*self.run_loop_iteration.write() += 1;
|
||||||
self.breakpoint(Breakpoint::Start);
|
self.breakpoint(Breakpoint::Start);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1728,6 +1704,30 @@ impl IndexSchedulerInner {
|
|||||||
let index = self.index_mapper.create_index(wtxn, name, date)?;
|
let index = self.index_mapper.create_index(wtxn, name, date)?;
|
||||||
Ok(index)
|
Ok(index)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Blocks the thread until the test handle asks to progress to/through this breakpoint.
|
||||||
|
///
|
||||||
|
/// Two messages are sent through the channel for each breakpoint.
|
||||||
|
/// The first message is `(b, false)` and the second message is `(b, true)`.
|
||||||
|
///
|
||||||
|
/// Since the channel has a capacity of zero, the `send` and `recv` calls wait for each other.
|
||||||
|
/// So when the index scheduler calls `test_breakpoint_sdr.send(b, false)`, it blocks
|
||||||
|
/// the thread until the test catches up by calling `test_breakpoint_rcv.recv()` enough.
|
||||||
|
/// From the test side, we call `recv()` repeatedly until we find the message `(breakpoint, false)`.
|
||||||
|
/// As soon as we find it, the index scheduler is unblocked but then wait again on the call to
|
||||||
|
/// `test_breakpoint_sdr.send(b, true)`. This message will only be able to send once the
|
||||||
|
/// test asks to progress to the next `(b2, false)`.
|
||||||
|
#[cfg(test)]
|
||||||
|
fn breakpoint(&self, b: Breakpoint) {
|
||||||
|
// We send two messages. The first one will sync with the call
|
||||||
|
// to `handle.wait_until(b)`. The second one will block until the
|
||||||
|
// the next call to `handle.wait_until(..)`.
|
||||||
|
self.test_breakpoint_sdr.send((b, false)).unwrap();
|
||||||
|
// This one will only be able to be sent if the test handle stays alive.
|
||||||
|
// If it fails, then it means that we have exited the test.
|
||||||
|
// By crashing with `unwrap`, we kill the run loop.
|
||||||
|
self.test_breakpoint_sdr.send((b, true)).unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Dump<'a> {
|
pub struct Dump<'a> {
|
||||||
@ -2043,12 +2043,13 @@ mod tests {
|
|||||||
|
|
||||||
(index_scheduler, index_scheduler_handle)
|
(index_scheduler, index_scheduler_handle)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IndexSchedulerInner {
|
||||||
/// Return a [`PlannedFailure`](Error::PlannedFailure) error if a failure is planned
|
/// Return a [`PlannedFailure`](Error::PlannedFailure) error if a failure is planned
|
||||||
/// for the given location and current run loop iteration.
|
/// for the given location and current run loop iteration.
|
||||||
pub fn maybe_fail(&self, location: FailureLocation) -> Result<()> {
|
pub fn maybe_fail(&self, location: FailureLocation) -> Result<()> {
|
||||||
if self.planned_failures.contains(&(*self.run_loop_iteration.read().unwrap(), location))
|
if self.planned_failures.contains(&(*self.run_loop_iteration.read(), location)) {
|
||||||
{
|
|
||||||
match location {
|
match location {
|
||||||
FailureLocation::PanicInsideProcessBatch => {
|
FailureLocation::PanicInsideProcessBatch => {
|
||||||
panic!("simulated panic")
|
panic!("simulated panic")
|
||||||
@ -3235,46 +3236,45 @@ mod tests {
|
|||||||
handle.advance_n_successful_batches(3);
|
handle.advance_n_successful_batches(3);
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processed_all_tasks");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processed_all_tasks");
|
||||||
|
|
||||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
|
||||||
let query = Query { limit: Some(0), ..Default::default() };
|
let query = Query { limit: Some(0), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||||
|
|
||||||
let query = Query { limit: Some(1), ..Default::default() };
|
let query = Query { limit: Some(1), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
|
|
||||||
let query = Query { limit: Some(2), ..Default::default() };
|
let query = Query { limit: Some(2), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]");
|
||||||
|
|
||||||
let query = Query { from: Some(1), ..Default::default() };
|
let query = Query { from: Some(1), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
|
||||||
|
|
||||||
let query = Query { from: Some(2), ..Default::default() };
|
let query = Query { from: Some(2), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
|
||||||
|
|
||||||
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
|
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
|
||||||
|
|
||||||
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
|
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
|
||||||
}
|
}
|
||||||
@ -3297,17 +3297,15 @@ mod tests {
|
|||||||
|
|
||||||
handle.advance_till([Start, BatchCreated]);
|
handle.advance_till([Start, BatchCreated]);
|
||||||
|
|
||||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
|
||||||
|
|
||||||
let query = Query { statuses: Some(vec![Status::Processing]), ..Default::default() };
|
let query = Query { statuses: Some(vec![Status::Processing]), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick
|
snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick
|
||||||
|
|
||||||
let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() };
|
let query = Query { statuses: Some(vec![Status::Enqueued]), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick
|
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick
|
||||||
|
|
||||||
@ -3316,7 +3314,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick
|
||||||
|
|
||||||
@ -3326,7 +3324,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes after the start of the test, which should excludes the enqueued tasks
|
// that comes after the start of the test, which should excludes the enqueued tasks
|
||||||
@ -3338,7 +3336,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes before the start of the test, which should excludes all of them
|
// that comes before the start of the test, which should excludes all of them
|
||||||
@ -3351,7 +3349,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes after the start of the test and before one minute after the start of the test,
|
// that comes after the start of the test and before one minute after the start of the test,
|
||||||
@ -3367,8 +3365,6 @@ mod tests {
|
|||||||
BatchCreated,
|
BatchCreated,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
|
||||||
|
|
||||||
let second_start_time = OffsetDateTime::now_utc();
|
let second_start_time = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
let query = Query {
|
let query = Query {
|
||||||
@ -3378,7 +3374,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes after the start of the test and before one minute after the start of the test,
|
// that comes after the start of the test and before one minute after the start of the test,
|
||||||
@ -3391,7 +3387,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes before the start of the test, which should exclude all tasks
|
// that comes before the start of the test, which should exclude all tasks
|
||||||
@ -3404,7 +3400,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes after the start of the second part of the test and before one minute after the
|
// that comes after the start of the second part of the test and before one minute after the
|
||||||
@ -3421,10 +3417,8 @@ mod tests {
|
|||||||
BatchCreated,
|
BatchCreated,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
|
||||||
|
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// we run the same query to verify that, and indeed find that the last task is matched
|
// we run the same query to verify that, and indeed find that the last task is matched
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
@ -3436,19 +3430,18 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// enqueued, succeeded, or processing tasks started after the second part of the test, should
|
// enqueued, succeeded, or processing tasks started after the second part of the test, should
|
||||||
// again only return the last task
|
// again only return the last task
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
|
|
||||||
handle.advance_till([ProcessBatchFailed, AfterProcessing]);
|
handle.advance_till([ProcessBatchFailed, AfterProcessing]);
|
||||||
let rtxn = index_scheduler.read_txn().unwrap();
|
|
||||||
|
|
||||||
// now the last task should have failed
|
// now the last task should have failed
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end");
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// so running the last query should return nothing
|
// so running the last query should return nothing
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||||
@ -3460,7 +3453,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// but the same query on failed tasks should return the last task
|
// but the same query on failed tasks should return the last task
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
@ -3472,7 +3465,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// but the same query on failed tasks should return the last task
|
// but the same query on failed tasks should return the last task
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
@ -3485,7 +3478,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// same query but with an invalid uid
|
// same query but with an invalid uid
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||||
@ -3498,7 +3491,7 @@ mod tests {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// same query but with a valid uid
|
// same query but with a valid uid
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
@ -3526,11 +3519,9 @@ mod tests {
|
|||||||
|
|
||||||
handle.advance_till([Start, BatchCreated]);
|
handle.advance_till([Start, BatchCreated]);
|
||||||
|
|
||||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
|
||||||
|
|
||||||
let query = Query { index_uids: Some(vec!["catto".to_owned()]), ..Default::default() };
|
let query = Query { index_uids: Some(vec!["catto".to_owned()]), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// only the first task associated with catto is returned, the indexSwap tasks are excluded!
|
// only the first task associated with catto is returned, the indexSwap tasks are excluded!
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
|
||||||
@ -3538,7 +3529,6 @@ mod tests {
|
|||||||
let query = Query { index_uids: Some(vec!["catto".to_owned()]), ..Default::default() };
|
let query = Query { index_uids: Some(vec!["catto".to_owned()]), ..Default::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(
|
.get_task_ids_from_authorized_indexes(
|
||||||
&rtxn,
|
|
||||||
&query,
|
&query,
|
||||||
&AuthFilter::with_allowed_indexes(
|
&AuthFilter::with_allowed_indexes(
|
||||||
vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(),
|
vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(),
|
||||||
@ -3552,7 +3542,6 @@ mod tests {
|
|||||||
let query = Query::default();
|
let query = Query::default();
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(
|
.get_task_ids_from_authorized_indexes(
|
||||||
&rtxn,
|
|
||||||
&query,
|
&query,
|
||||||
&AuthFilter::with_allowed_indexes(
|
&AuthFilter::with_allowed_indexes(
|
||||||
vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(),
|
vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(),
|
||||||
@ -3566,7 +3555,6 @@ mod tests {
|
|||||||
let query = Query::default();
|
let query = Query::default();
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(
|
.get_task_ids_from_authorized_indexes(
|
||||||
&rtxn,
|
|
||||||
&query,
|
&query,
|
||||||
&AuthFilter::with_allowed_indexes(
|
&AuthFilter::with_allowed_indexes(
|
||||||
vec![
|
vec![
|
||||||
@ -3584,7 +3572,7 @@ mod tests {
|
|||||||
|
|
||||||
let query = Query::default();
|
let query = Query::default();
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// we asked for all the tasks with all index authorized -> all tasks returned
|
// we asked for all the tasks with all index authorized -> all tasks returned
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,3,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,3,]");
|
||||||
@ -3614,10 +3602,9 @@ mod tests {
|
|||||||
|
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
|
||||||
|
|
||||||
let rtxn = index_scheduler.read_txn().unwrap();
|
|
||||||
let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() };
|
let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
|
.get_task_ids_from_authorized_indexes(&query, &AuthFilter::default())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// 0 is not returned because it was not canceled, 3 is not returned because it is the uid of the
|
// 0 is not returned because it was not canceled, 3 is not returned because it is the uid of the
|
||||||
// taskCancelation itself
|
// taskCancelation itself
|
||||||
@ -3626,7 +3613,6 @@ mod tests {
|
|||||||
let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() };
|
let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() };
|
||||||
let (tasks, _) = index_scheduler
|
let (tasks, _) = index_scheduler
|
||||||
.get_task_ids_from_authorized_indexes(
|
.get_task_ids_from_authorized_indexes(
|
||||||
&rtxn,
|
|
||||||
&query,
|
&query,
|
||||||
&AuthFilter::with_allowed_indexes(
|
&AuthFilter::with_allowed_indexes(
|
||||||
vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(),
|
vec![IndexUidPattern::new_unchecked("doggo")].into_iter().collect(),
|
||||||
@ -3650,7 +3636,7 @@ mod tests {
|
|||||||
handle.advance_one_failed_batch();
|
handle.advance_one_failed_batch();
|
||||||
|
|
||||||
// Still in the first iteration
|
// Still in the first iteration
|
||||||
assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 1);
|
assert_eq!(*index_scheduler.inner().run_loop_iteration.read(), 1);
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4442,7 +4428,7 @@ mod tests {
|
|||||||
handle.advance_till([Start, BatchCreated, ProcessBatchFailed, AfterProcessing]);
|
handle.advance_till([Start, BatchCreated, ProcessBatchFailed, AfterProcessing]);
|
||||||
|
|
||||||
// Still in the first iteration
|
// Still in the first iteration
|
||||||
assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 1);
|
assert_eq!(*index_scheduler.inner().run_loop_iteration.read(), 1);
|
||||||
// No matter what happens in process_batch, the index_scheduler should be internally consistent
|
// No matter what happens in process_batch, the index_scheduler should be internally consistent
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "index_creation_failed");
|
||||||
}
|
}
|
||||||
@ -4530,6 +4516,7 @@ mod tests {
|
|||||||
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
|
.register(KindWithContent::IndexCreation { index_uid: S("doggo"), primary_key: None })
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let index_scheduler = index_scheduler.inner();
|
||||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||||
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
|
let tasks = index_scheduler.get_task_ids(&rtxn, &Query { ..Default::default() }).unwrap();
|
||||||
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
|
let tasks = index_scheduler.get_existing_tasks(&rtxn, tasks).unwrap();
|
||||||
|
@ -331,11 +331,12 @@ pub fn clamp_to_page_size(size: usize) -> usize {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl IndexScheduler {
|
impl crate::IndexScheduler {
|
||||||
/// Asserts that the index scheduler's content is internally consistent.
|
/// Asserts that the index scheduler's content is internally consistent.
|
||||||
pub fn assert_internally_consistent(&self) {
|
pub fn assert_internally_consistent(&self) {
|
||||||
let rtxn = self.env.read_txn().unwrap();
|
let this = self.inner();
|
||||||
for task in self.all_tasks.iter(&rtxn).unwrap() {
|
let rtxn = this.env.read_txn().unwrap();
|
||||||
|
for task in this.all_tasks.iter(&rtxn).unwrap() {
|
||||||
let (task_id, task) = task.unwrap();
|
let (task_id, task) = task.unwrap();
|
||||||
let task_id = task_id.get();
|
let task_id = task_id.get();
|
||||||
|
|
||||||
@ -354,21 +355,21 @@ impl IndexScheduler {
|
|||||||
} = task;
|
} = task;
|
||||||
assert_eq!(uid, task.uid);
|
assert_eq!(uid, task.uid);
|
||||||
if let Some(task_index_uid) = &task_index_uid {
|
if let Some(task_index_uid) = &task_index_uid {
|
||||||
assert!(self
|
assert!(this
|
||||||
.index_tasks
|
.index_tasks
|
||||||
.get(&rtxn, task_index_uid.as_str())
|
.get(&rtxn, task_index_uid.as_str())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.contains(task.uid));
|
.contains(task.uid));
|
||||||
}
|
}
|
||||||
let db_enqueued_at = self
|
let db_enqueued_at = this
|
||||||
.enqueued_at
|
.enqueued_at
|
||||||
.get(&rtxn, &BEI128::new(enqueued_at.unix_timestamp_nanos()))
|
.get(&rtxn, &BEI128::new(enqueued_at.unix_timestamp_nanos()))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(db_enqueued_at.contains(task_id));
|
assert!(db_enqueued_at.contains(task_id));
|
||||||
if let Some(started_at) = started_at {
|
if let Some(started_at) = started_at {
|
||||||
let db_started_at = self
|
let db_started_at = this
|
||||||
.started_at
|
.started_at
|
||||||
.get(&rtxn, &BEI128::new(started_at.unix_timestamp_nanos()))
|
.get(&rtxn, &BEI128::new(started_at.unix_timestamp_nanos()))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@ -376,7 +377,7 @@ impl IndexScheduler {
|
|||||||
assert!(db_started_at.contains(task_id));
|
assert!(db_started_at.contains(task_id));
|
||||||
}
|
}
|
||||||
if let Some(finished_at) = finished_at {
|
if let Some(finished_at) = finished_at {
|
||||||
let db_finished_at = self
|
let db_finished_at = this
|
||||||
.finished_at
|
.finished_at
|
||||||
.get(&rtxn, &BEI128::new(finished_at.unix_timestamp_nanos()))
|
.get(&rtxn, &BEI128::new(finished_at.unix_timestamp_nanos()))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@ -384,9 +385,9 @@ impl IndexScheduler {
|
|||||||
assert!(db_finished_at.contains(task_id));
|
assert!(db_finished_at.contains(task_id));
|
||||||
}
|
}
|
||||||
if let Some(canceled_by) = canceled_by {
|
if let Some(canceled_by) = canceled_by {
|
||||||
let db_canceled_tasks = self.get_status(&rtxn, Status::Canceled).unwrap();
|
let db_canceled_tasks = this.get_status(&rtxn, Status::Canceled).unwrap();
|
||||||
assert!(db_canceled_tasks.contains(uid));
|
assert!(db_canceled_tasks.contains(uid));
|
||||||
let db_canceling_task = self.get_task(&rtxn, canceled_by).unwrap().unwrap();
|
let db_canceling_task = this.get_task(&rtxn, canceled_by).unwrap().unwrap();
|
||||||
assert_eq!(db_canceling_task.status, Status::Succeeded);
|
assert_eq!(db_canceling_task.status, Status::Succeeded);
|
||||||
match db_canceling_task.kind {
|
match db_canceling_task.kind {
|
||||||
KindWithContent::TaskCancelation { query: _, tasks } => {
|
KindWithContent::TaskCancelation { query: _, tasks } => {
|
||||||
@ -427,7 +428,7 @@ impl IndexScheduler {
|
|||||||
Details::IndexInfo { primary_key: pk1 } => match &kind {
|
Details::IndexInfo { primary_key: pk1 } => match &kind {
|
||||||
KindWithContent::IndexCreation { index_uid, primary_key: pk2 }
|
KindWithContent::IndexCreation { index_uid, primary_key: pk2 }
|
||||||
| KindWithContent::IndexUpdate { index_uid, primary_key: pk2 } => {
|
| KindWithContent::IndexUpdate { index_uid, primary_key: pk2 } => {
|
||||||
self.index_tasks
|
this.index_tasks
|
||||||
.get(&rtxn, index_uid.as_str())
|
.get(&rtxn, index_uid.as_str())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@ -535,23 +536,23 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(self.get_status(&rtxn, status).unwrap().contains(uid));
|
assert!(this.get_status(&rtxn, status).unwrap().contains(uid));
|
||||||
assert!(self.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid));
|
assert!(this.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid));
|
||||||
|
|
||||||
if let KindWithContent::DocumentAdditionOrUpdate { content_file, .. } = kind {
|
if let KindWithContent::DocumentAdditionOrUpdate { content_file, .. } = kind {
|
||||||
match status {
|
match status {
|
||||||
Status::Enqueued | Status::Processing => {
|
Status::Enqueued | Status::Processing => {
|
||||||
assert!(self
|
assert!(this
|
||||||
.file_store
|
.file_store
|
||||||
.all_uuids()
|
.all_uuids()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.any(|uuid| uuid.as_ref().unwrap() == &content_file),
|
.any(|uuid| uuid.as_ref().unwrap() == &content_file),
|
||||||
"Could not find uuid `{content_file}` in the file_store. Available uuids are {:?}.",
|
"Could not find uuid `{content_file}` in the file_store. Available uuids are {:?}.",
|
||||||
self.file_store.all_uuids().unwrap().collect::<std::result::Result<Vec<_>, file_store::Error>>().unwrap(),
|
this.file_store.all_uuids().unwrap().collect::<std::result::Result<Vec<_>, file_store::Error>>().unwrap(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Status::Succeeded | Status::Failed | Status::Canceled => {
|
Status::Succeeded | Status::Failed | Status::Canceled => {
|
||||||
assert!(self
|
assert!(this
|
||||||
.file_store
|
.file_store
|
||||||
.all_uuids()
|
.all_uuids()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
Loading…
Reference in New Issue
Block a user