mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 03:47:02 +02:00
Make the changes to use heed v0.20-alpha.6
This commit is contained in:
parent
56a0d91ecd
commit
0d4482625a
54 changed files with 611 additions and 477 deletions
|
@ -32,7 +32,7 @@ use meilisearch_types::milli::heed::CompactionOption;
|
|||
use meilisearch_types::milli::update::{
|
||||
IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
|
||||
};
|
||||
use meilisearch_types::milli::{self, Filter, BEU32};
|
||||
use meilisearch_types::milli::{self, Filter};
|
||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
|
||||
|
@ -715,7 +715,7 @@ impl IndexScheduler {
|
|||
|
||||
// 2. Snapshot the index-scheduler LMDB env
|
||||
//
|
||||
// When we call copy_to_path, LMDB opens a read transaction by itself,
|
||||
// When we call copy_to_file, LMDB opens a read transaction by itself,
|
||||
// we can't provide our own. It is an issue as we would like to know
|
||||
// the update files to copy but new ones can be enqueued between the copy
|
||||
// of the env and the new transaction we open to retrieve the enqueued tasks.
|
||||
|
@ -728,7 +728,7 @@ impl IndexScheduler {
|
|||
// 2.1 First copy the LMDB env of the index-scheduler
|
||||
let dst = temp_snapshot_dir.path().join("tasks");
|
||||
fs::create_dir_all(&dst)?;
|
||||
self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
|
||||
// 2.2 Create a read transaction on the index-scheduler
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
@ -766,7 +766,7 @@ impl IndexScheduler {
|
|||
.map_size(1024 * 1024 * 1024) // 1 GiB
|
||||
.max_dbs(2)
|
||||
.open(&self.auth_path)?;
|
||||
auth.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
|
||||
// 5. Copy and tarball the flat snapshot
|
||||
// 5.1 Find the original name of the database
|
||||
|
@ -1106,7 +1106,7 @@ impl IndexScheduler {
|
|||
for task_id in &index_lhs_task_ids | &index_rhs_task_ids {
|
||||
let mut task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
swap_index_uid_in_task(&mut task, (lhs, rhs));
|
||||
self.all_tasks.put(wtxn, &BEU32::new(task_id), &task)?;
|
||||
self.all_tasks.put(wtxn, &task_id, &task)?;
|
||||
}
|
||||
|
||||
// 4. remove the task from indexuid = before_name
|
||||
|
@ -1132,7 +1132,7 @@ impl IndexScheduler {
|
|||
/// The list of processed tasks.
|
||||
fn apply_index_operation<'i>(
|
||||
&self,
|
||||
index_wtxn: &mut RwTxn<'i, '_>,
|
||||
index_wtxn: &mut RwTxn<'i>,
|
||||
index: &'i Index,
|
||||
operation: IndexOperation,
|
||||
) -> Result<Vec<Task>> {
|
||||
|
@ -1479,10 +1479,10 @@ impl IndexScheduler {
|
|||
}
|
||||
|
||||
for task in to_delete_tasks.iter() {
|
||||
self.all_tasks.delete(wtxn, &BEU32::new(task))?;
|
||||
self.all_tasks.delete(wtxn, &task)?;
|
||||
}
|
||||
for canceled_by in affected_canceled_by {
|
||||
let canceled_by = BEU32::new(canceled_by);
|
||||
let canceled_by = canceled_by;
|
||||
if let Some(mut tasks) = self.canceled_by.get(wtxn, &canceled_by)? {
|
||||
tasks -= &to_delete_tasks;
|
||||
if tasks.is_empty() {
|
||||
|
@ -1530,14 +1530,14 @@ impl IndexScheduler {
|
|||
task.details = task.details.map(|d| d.to_failed());
|
||||
self.update_task(wtxn, &task)?;
|
||||
}
|
||||
self.canceled_by.put(wtxn, &BEU32::new(cancel_task_id), &tasks_to_cancel)?;
|
||||
self.canceled_by.put(wtxn, &cancel_task_id, &tasks_to_cancel)?;
|
||||
|
||||
Ok(content_files_to_delete)
|
||||
}
|
||||
}
|
||||
|
||||
fn delete_document_by_filter<'a>(
|
||||
wtxn: &mut RwTxn<'a, '_>,
|
||||
wtxn: &mut RwTxn<'a>,
|
||||
filter: &serde_json::Value,
|
||||
indexer_config: &IndexerConfig,
|
||||
must_stop_processing: MustStopProcessing,
|
||||
|
|
|
@ -5,8 +5,7 @@ use std::collections::BTreeMap;
|
|||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use meilisearch_types::heed::flags::Flags;
|
||||
use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions};
|
||||
use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
|
||||
use meilisearch_types::milli::Index;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
@ -309,7 +308,7 @@ fn create_or_open_index(
|
|||
options.map_size(clamp_to_page_size(map_size));
|
||||
options.max_readers(1024);
|
||||
if enable_mdb_writemap {
|
||||
unsafe { options.flag(Flags::MdbWriteMap) };
|
||||
unsafe { options.flags(EnvFlags::WRITE_MAP) };
|
||||
}
|
||||
|
||||
if let Some((created, updated)) = date {
|
||||
|
|
|
@ -47,8 +47,9 @@ pub use features::RoFeatures;
|
|||
use file_store::FileStore;
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::heed::byteorder::BE;
|
||||
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128};
|
||||
use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
|
||||
|
@ -64,8 +65,7 @@ use uuid::Uuid;
|
|||
use crate::index_mapper::IndexMapper;
|
||||
use crate::utils::{check_index_swap_validity, clamp_to_page_size};
|
||||
|
||||
pub(crate) type BEI128 =
|
||||
meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>;
|
||||
pub(crate) type BEI128 = I128<BE>;
|
||||
|
||||
/// Defines a subset of tasks to be retrieved from the [`IndexScheduler`].
|
||||
///
|
||||
|
@ -278,7 +278,7 @@ pub struct IndexScheduler {
|
|||
pub(crate) file_store: FileStore,
|
||||
|
||||
// The main database, it contains all the tasks accessible by their Id.
|
||||
pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>,
|
||||
pub(crate) all_tasks: Database<BEU32, SerdeJson<Task>>,
|
||||
|
||||
/// All the tasks ids grouped by their status.
|
||||
// TODO we should not be able to serialize a `Status::Processing` in this database.
|
||||
|
@ -289,16 +289,16 @@ pub struct IndexScheduler {
|
|||
pub(crate) index_tasks: Database<Str, RoaringBitmapCodec>,
|
||||
|
||||
/// Store the tasks that were canceled by a task uid
|
||||
pub(crate) canceled_by: Database<OwnedType<BEU32>, RoaringBitmapCodec>,
|
||||
pub(crate) canceled_by: Database<BEU32, RoaringBitmapCodec>,
|
||||
|
||||
/// Store the task ids of tasks which were enqueued at a specific date
|
||||
pub(crate) enqueued_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
pub(crate) enqueued_at: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
|
||||
/// Store the task ids of finished tasks which started being processed at a specific date
|
||||
pub(crate) started_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
pub(crate) started_at: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
|
||||
/// Store the task ids of tasks which finished at a specific date
|
||||
pub(crate) finished_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
pub(crate) finished_at: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
|
||||
/// In charge of creating, opening, storing and returning indexes.
|
||||
pub(crate) index_mapper: IndexMapper,
|
||||
|
@ -730,9 +730,7 @@ impl IndexScheduler {
|
|||
if let Some(canceled_by) = &query.canceled_by {
|
||||
let mut all_canceled_tasks = RoaringBitmap::new();
|
||||
for cancel_task_uid in canceled_by {
|
||||
if let Some(canceled_by_uid) =
|
||||
self.canceled_by.get(rtxn, &BEU32::new(*cancel_task_uid))?
|
||||
{
|
||||
if let Some(canceled_by_uid) = self.canceled_by.get(rtxn, &*cancel_task_uid)? {
|
||||
all_canceled_tasks |= canceled_by_uid;
|
||||
}
|
||||
}
|
||||
|
@ -983,7 +981,7 @@ impl IndexScheduler {
|
|||
|
||||
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
|
||||
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
|
||||
&& (self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64 > 50
|
||||
&& (self.env.non_free_pages_size()? * 100) / self.env.info().map_size as u64 > 50
|
||||
{
|
||||
return Err(Error::NoSpaceLeftInTaskQueue);
|
||||
}
|
||||
|
@ -1009,7 +1007,7 @@ impl IndexScheduler {
|
|||
// Get rid of the mutability.
|
||||
let task = task;
|
||||
|
||||
self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
|
||||
self.all_tasks.put_with_flags(&mut wtxn, PutFlags::APPEND, &task.uid, &task)?;
|
||||
|
||||
for index in task.indexes() {
|
||||
self.update_index(&mut wtxn, index, |bitmap| {
|
||||
|
@ -1187,7 +1185,7 @@ impl IndexScheduler {
|
|||
| Err(Error::AbortedTask) => {
|
||||
#[cfg(test)]
|
||||
self.breakpoint(Breakpoint::AbortedIndexation);
|
||||
wtxn.abort().map_err(Error::HeedTransaction)?;
|
||||
wtxn.abort();
|
||||
|
||||
// We make sure that we don't call `stop_processing` on the `processing_tasks`,
|
||||
// this is because we want to let the next tick call `create_next_batch` and keep
|
||||
|
@ -1208,7 +1206,7 @@ impl IndexScheduler {
|
|||
let index_uid = index_uid.unwrap();
|
||||
// fixme: handle error more gracefully? not sure when this could happen
|
||||
self.index_mapper.resize_index(&wtxn, &index_uid)?;
|
||||
wtxn.abort().map_err(Error::HeedTransaction)?;
|
||||
wtxn.abort();
|
||||
|
||||
return Ok(TickOutcome::TickAgain(0));
|
||||
}
|
||||
|
@ -1354,7 +1352,7 @@ impl IndexScheduler {
|
|||
|
||||
pub struct Dump<'a> {
|
||||
index_scheduler: &'a IndexScheduler,
|
||||
wtxn: RwTxn<'a, 'a>,
|
||||
wtxn: RwTxn<'a>,
|
||||
|
||||
indexes: HashMap<String, RoaringBitmap>,
|
||||
statuses: HashMap<Status, RoaringBitmap>,
|
||||
|
@ -1469,7 +1467,7 @@ impl<'a> Dump<'a> {
|
|||
},
|
||||
};
|
||||
|
||||
self.index_scheduler.all_tasks.put(&mut self.wtxn, &BEU32::new(task.uid), &task)?;
|
||||
self.index_scheduler.all_tasks.put(&mut self.wtxn, &task.uid, &task)?;
|
||||
|
||||
for index in task.indexes() {
|
||||
match self.indexes.get_mut(index) {
|
||||
|
|
|
@ -3,9 +3,9 @@
|
|||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::ops::Bound;
|
||||
|
||||
use meilisearch_types::heed::types::{DecodeIgnore, OwnedType};
|
||||
use meilisearch_types::heed::types::DecodeIgnore;
|
||||
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
|
||||
use meilisearch_types::milli::CboRoaringBitmapCodec;
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status};
|
||||
use roaring::{MultiOps, RoaringBitmap};
|
||||
use time::OffsetDateTime;
|
||||
|
@ -18,7 +18,7 @@ impl IndexScheduler {
|
|||
}
|
||||
|
||||
pub(crate) fn last_task_id(&self, rtxn: &RoTxn) -> Result<Option<TaskId>> {
|
||||
Ok(self.all_tasks.remap_data_type::<DecodeIgnore>().last(rtxn)?.map(|(k, _)| k.get() + 1))
|
||||
Ok(self.all_tasks.remap_data_type::<DecodeIgnore>().last(rtxn)?.map(|(k, _)| k + 1))
|
||||
}
|
||||
|
||||
pub(crate) fn next_task_id(&self, rtxn: &RoTxn) -> Result<TaskId> {
|
||||
|
@ -26,7 +26,7 @@ impl IndexScheduler {
|
|||
}
|
||||
|
||||
pub(crate) fn get_task(&self, rtxn: &RoTxn, task_id: TaskId) -> Result<Option<Task>> {
|
||||
Ok(self.all_tasks.get(rtxn, &BEU32::new(task_id))?)
|
||||
Ok(self.all_tasks.get(rtxn, &task_id)?)
|
||||
}
|
||||
|
||||
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
|
||||
|
@ -88,7 +88,7 @@ impl IndexScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
self.all_tasks.put(wtxn, &BEU32::new(task.uid), task)?;
|
||||
self.all_tasks.put(wtxn, &task.uid, task)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -169,11 +169,11 @@ impl IndexScheduler {
|
|||
|
||||
pub(crate) fn insert_task_datetime(
|
||||
wtxn: &mut RwTxn,
|
||||
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
database: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
time: OffsetDateTime,
|
||||
task_id: TaskId,
|
||||
) -> Result<()> {
|
||||
let timestamp = BEI128::new(time.unix_timestamp_nanos());
|
||||
let timestamp = time.unix_timestamp_nanos();
|
||||
let mut task_ids = database.get(wtxn, ×tamp)?.unwrap_or_default();
|
||||
task_ids.insert(task_id);
|
||||
database.put(wtxn, ×tamp, &RoaringBitmap::from_iter(task_ids))?;
|
||||
|
@ -182,11 +182,11 @@ pub(crate) fn insert_task_datetime(
|
|||
|
||||
pub(crate) fn remove_task_datetime(
|
||||
wtxn: &mut RwTxn,
|
||||
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
database: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
time: OffsetDateTime,
|
||||
task_id: TaskId,
|
||||
) -> Result<()> {
|
||||
let timestamp = BEI128::new(time.unix_timestamp_nanos());
|
||||
let timestamp = time.unix_timestamp_nanos();
|
||||
if let Some(mut existing) = database.get(wtxn, ×tamp)? {
|
||||
existing.remove(task_id);
|
||||
if existing.is_empty() {
|
||||
|
@ -202,7 +202,7 @@ pub(crate) fn remove_task_datetime(
|
|||
pub(crate) fn keep_tasks_within_datetimes(
|
||||
rtxn: &RoTxn,
|
||||
tasks: &mut RoaringBitmap,
|
||||
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
database: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
after: Option<OffsetDateTime>,
|
||||
before: Option<OffsetDateTime>,
|
||||
) -> Result<()> {
|
||||
|
@ -213,8 +213,8 @@ pub(crate) fn keep_tasks_within_datetimes(
|
|||
(Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)),
|
||||
};
|
||||
let mut collected_task_ids = RoaringBitmap::new();
|
||||
let start = map_bound(start, |b| BEI128::new(b.unix_timestamp_nanos()));
|
||||
let end = map_bound(end, |b| BEI128::new(b.unix_timestamp_nanos()));
|
||||
let start = map_bound(start, |b| b.unix_timestamp_nanos());
|
||||
let end = map_bound(end, |b| b.unix_timestamp_nanos());
|
||||
let iter = database.range(rtxn, &(start, end))?;
|
||||
for r in iter {
|
||||
let (_timestamp, task_ids) = r?;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::borrow::Cow;
|
||||
use std::convert::TryInto;
|
||||
|
||||
use meilisearch_types::heed::{BytesDecode, BytesEncode};
|
||||
use meilisearch_types::heed::{BoxedError, BytesDecode, BytesEncode};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// A heed codec for value of struct Uuid.
|
||||
|
@ -10,15 +10,15 @@ pub struct UuidCodec;
|
|||
impl<'a> BytesDecode<'a> for UuidCodec {
|
||||
type DItem = Uuid;
|
||||
|
||||
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
|
||||
bytes.try_into().ok().map(Uuid::from_bytes)
|
||||
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
|
||||
bytes.try_into().map(Uuid::from_bytes).map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
impl BytesEncode<'_> for UuidCodec {
|
||||
type EItem = Uuid;
|
||||
|
||||
fn bytes_encode(item: &Self::EItem) -> Option<Cow<[u8]>> {
|
||||
Some(Cow::Borrowed(item.as_bytes()))
|
||||
fn bytes_encode(item: &Self::EItem) -> Result<Cow<[u8]>, BoxedError> {
|
||||
Ok(Cow::Borrowed(item.as_bytes()))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue