Merge branch 'main' of github.com:meilisearch/meilisearch into CNLHC/change_json_error_message

This commit is contained in:
Liu Hancheng 2022-03-25 19:53:13 +08:00
commit 193c666bf9
62 changed files with 1463 additions and 898 deletions

View file

@ -3,9 +3,9 @@ use std::io::{BufReader, Seek, SeekFrom, Write};
use std::path::Path;
use anyhow::Context;
use heed::{EnvOpenOptions, RoTxn};
use indexmap::IndexMap;
use milli::documents::DocumentBatchReader;
use milli::heed::{EnvOpenOptions, RoTxn};
use milli::update::{IndexDocumentsConfig, IndexerConfig};
use serde::{Deserialize, Serialize};

View file

@ -21,7 +21,7 @@ pub enum IndexError {
internal_error!(
IndexError: std::io::Error,
heed::Error,
milli::heed::Error,
fst::Error,
serde_json::Error,
update_file_store::UpdateFileStoreError,

View file

@ -5,12 +5,12 @@ use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use heed::{EnvOpenOptions, RoTxn};
use milli::heed::{EnvOpenOptions, RoTxn};
use milli::update::{IndexerConfig, Setting};
use milli::{obkv_to_json, FieldDistribution, FieldId};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::EnvSizer;
@ -24,8 +24,10 @@ pub type Document = Map<String, Value>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
pub updated_at: OffsetDateTime,
pub primary_key: Option<String>,
}
@ -35,7 +37,7 @@ impl IndexMeta {
Self::new_txn(index, &txn)
}
pub fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
pub fn new_txn(index: &Index, txn: &milli::heed::RoTxn) -> Result<Self> {
let created_at = index.created_at(txn)?;
let updated_at = index.updated_at(txn)?;
let primary_key = index.primary_key(txn)?.map(String::from);
@ -248,7 +250,7 @@ impl Index {
fn fields_to_display<S: AsRef<str>>(
&self,
txn: &heed::RoTxn,
txn: &milli::heed::RoTxn,
attributes_to_retrieve: &Option<Vec<S>>,
fields_ids_map: &milli::FieldsIdsMap,
) -> Result<Vec<FieldId>> {
@ -276,7 +278,7 @@ impl Index {
let _txn = self.write_txn()?;
self.inner
.env
.copy_to_path(dst, heed::CompactionOption::Enabled)?;
.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
Ok(())
}
}

View file

@ -176,7 +176,7 @@ pub struct Facets {
impl Index {
fn update_primary_key_txn<'a, 'b>(
&'a self,
txn: &mut heed::RwTxn<'a, 'b>,
txn: &mut milli::heed::RwTxn<'a, 'b>,
primary_key: String,
) -> Result<IndexMeta> {
let mut builder = milli::update::Settings::new(txn, self, self.indexer_config.as_ref());

View file

@ -3,9 +3,10 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_stream::stream;
use chrono::Utc;
use futures::{lock::Mutex, stream::StreamExt};
use log::{error, trace};
use time::macros::format_description;
use time::OffsetDateTime;
use tokio::sync::{mpsc, oneshot, RwLock};
use super::error::{DumpActorError, Result};
@ -29,7 +30,11 @@ pub struct DumpActor {
/// Generate uid from creation date
fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
OffsetDateTime::now_utc()
.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
))
.unwrap()
}
impl DumpActor {
@ -154,3 +159,33 @@ impl DumpActor {
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_generate_uid() {
let current = OffsetDateTime::now_utc();
let uid = generate_uid();
let (date, time) = uid.split_once('-').unwrap();
let date = time::Date::parse(
date,
&format_description!("[year repr:full][month repr:numerical][day padding:zero]"),
)
.unwrap();
let time = time::Time::parse(
time,
&format_description!(
"[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
),
)
.unwrap();
let datetime = time::PrimitiveDateTime::new(date, time);
let datetime = datetime.assume_utc();
assert!(current - datetime < time::Duration::SECOND);
}
}

View file

@ -1,8 +1,8 @@
use anyhow::bail;
use chrono::{DateTime, Utc};
use meilisearch_error::Code;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::index::{Settings, Unchecked};
@ -51,7 +51,8 @@ pub enum UpdateMeta {
pub struct Enqueued {
pub update_id: u64,
pub meta: UpdateMeta,
pub enqueued_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub enqueued_at: OffsetDateTime,
pub content: Option<Uuid>,
}
@ -59,7 +60,8 @@ pub struct Enqueued {
#[serde(rename_all = "camelCase")]
pub struct Processed {
pub success: UpdateResult,
pub processed_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub processed_at: OffsetDateTime,
#[serde(flatten)]
pub from: Processing,
}
@ -69,7 +71,8 @@ pub struct Processed {
pub struct Processing {
#[serde(flatten)]
pub from: Enqueued,
pub started_processing_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub started_processing_at: OffsetDateTime,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -77,7 +80,8 @@ pub struct Processing {
pub struct Aborted {
#[serde(flatten)]
pub from: Enqueued,
pub aborted_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub aborted_at: OffsetDateTime,
}
#[derive(Debug, Serialize, Deserialize)]
@ -86,7 +90,8 @@ pub struct Failed {
#[serde(flatten)]
pub from: Processing,
pub error: ResponseError,
pub failed_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub failed_at: OffsetDateTime,
}
#[derive(Debug, Serialize, Deserialize)]

View file

@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use meilisearch_error::{Code, ResponseError};
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::index::{Settings, Unchecked};
@ -107,7 +107,8 @@ pub enum UpdateMeta {
pub struct Enqueued {
pub update_id: u64,
pub meta: Update,
pub enqueued_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub enqueued_at: OffsetDateTime,
}
impl Enqueued {
@ -122,7 +123,8 @@ impl Enqueued {
#[serde(rename_all = "camelCase")]
pub struct Processed {
pub success: v2::UpdateResult,
pub processed_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub processed_at: OffsetDateTime,
#[serde(flatten)]
pub from: Processing,
}
@ -144,7 +146,8 @@ impl Processed {
pub struct Processing {
#[serde(flatten)]
pub from: Enqueued,
pub started_processing_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub started_processing_at: OffsetDateTime,
}
impl Processing {
@ -163,7 +166,8 @@ pub struct Failed {
pub from: Processing,
pub msg: String,
pub code: Code,
pub failed_at: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
pub failed_at: OffsetDateTime,
}
impl Failed {

View file

@ -18,7 +18,7 @@ pub enum DumpActorError {
}
internal_error!(
DumpActorError: heed::Error,
DumpActorError: milli::heed::Error,
std::io::Error,
tokio::task::JoinError,
tokio::sync::oneshot::error::RecvError,

View file

@ -1,9 +1,9 @@
use std::path::Path;
use std::sync::Arc;
use heed::EnvOpenOptions;
use log::info;
use meilisearch_auth::AuthController;
use milli::heed::EnvOpenOptions;
use crate::analytics;
use crate::index_controller::dump_actor::Metadata;

View file

@ -3,9 +3,9 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::bail;
use chrono::{DateTime, Utc};
use log::{info, trace};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
pub use actor::DumpActor;
pub use handle_impl::*;
@ -40,7 +40,8 @@ pub struct Metadata {
db_version: String,
index_db_size: usize,
update_db_size: usize,
dump_date: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
dump_date: OffsetDateTime,
}
impl Metadata {
@ -49,7 +50,7 @@ impl Metadata {
db_version: env!("CARGO_PKG_VERSION").to_string(),
index_db_size,
update_db_size,
dump_date: Utc::now(),
dump_date: OffsetDateTime::now_utc(),
}
}
}
@ -144,7 +145,7 @@ impl MetadataVersion {
}
}
pub fn dump_date(&self) -> Option<&DateTime<Utc>> {
pub fn dump_date(&self) -> Option<&OffsetDateTime> {
match self {
MetadataVersion::V1(_) => None,
MetadataVersion::V2(meta) | MetadataVersion::V3(meta) | MetadataVersion::V4(meta) => {
@ -169,9 +170,13 @@ pub struct DumpInfo {
pub status: DumpStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
started_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
finished_at: Option<DateTime<Utc>>,
#[serde(with = "time::serde::rfc3339")]
started_at: OffsetDateTime,
#[serde(
skip_serializing_if = "Option::is_none",
with = "time::serde::rfc3339::option"
)]
finished_at: Option<OffsetDateTime>,
}
impl DumpInfo {
@ -180,19 +185,19 @@ impl DumpInfo {
uid,
status,
error: None,
started_at: Utc::now(),
started_at: OffsetDateTime::now_utc(),
finished_at: None,
}
}
pub fn with_error(&mut self, error: String) {
self.status = DumpStatus::Failed;
self.finished_at = Some(Utc::now());
self.finished_at = Some(OffsetDateTime::now_utc());
self.error = Some(error);
}
pub fn done(&mut self) {
self.finished_at = Some(Utc::now());
self.finished_at = Some(OffsetDateTime::now_utc());
self.status = DumpStatus::Done;
}

View file

@ -8,11 +8,11 @@ use std::time::Duration;
use actix_web::error::PayloadError;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::Stream;
use futures::StreamExt;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::{mpsc, RwLock};
use tokio::task::spawn_blocking;
use tokio::time::sleep;
@ -48,8 +48,8 @@ pub type Payload = Box<
dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin,
>;
pub fn open_meta_env(path: &Path, size: usize) -> heed::Result<heed::Env> {
let mut options = heed::EnvOpenOptions::new();
pub fn open_meta_env(path: &Path, size: usize) -> milli::heed::Result<milli::heed::Env> {
let mut options = milli::heed::EnvOpenOptions::new();
options.map_size(size);
options.max_dbs(20);
options.open(path)
@ -114,7 +114,8 @@ impl fmt::Display for DocumentAdditionFormat {
#[serde(rename_all = "camelCase")]
pub struct Stats {
pub database_size: u64,
pub last_update: Option<DateTime<Utc>>,
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
pub last_update: Option<OffsetDateTime>,
pub indexes: BTreeMap<String, IndexStats>,
}
@ -582,7 +583,7 @@ where
}
pub async fn get_all_stats(&self, search_rules: &SearchRules) -> Result<Stats> {
let mut last_task: Option<DateTime<_>> = None;
let mut last_task: Option<OffsetDateTime> = None;
let mut indexes = BTreeMap::new();
let mut database_size = 0;
let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?;

View file

@ -45,7 +45,7 @@ impl From<OneshotRecvError> for IndexResolverError {
}
internal_error!(
IndexResolverError: heed::Error,
IndexResolverError: milli::heed::Error,
uuid::Error,
std::io::Error,
tokio::task::JoinError,

View file

@ -4,8 +4,8 @@ use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use heed::types::{SerdeBincode, Str};
use heed::{CompactionOption, Database, Env};
use milli::heed::types::{SerdeBincode, Str};
use milli::heed::{CompactionOption, Database, Env};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -56,7 +56,7 @@ impl Drop for HeedMetaStore {
}
impl HeedMetaStore {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
let db = env.create_database(Some("uuids"))?;
Ok(Self { env, db })
}
@ -153,7 +153,7 @@ impl HeedMetaStore {
Ok(())
}
pub fn load_dump(src: impl AsRef<Path>, env: Arc<heed::Env>) -> Result<()> {
pub fn load_dump(src: impl AsRef<Path>, env: Arc<milli::heed::Env>) -> Result<()> {
let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl");
let indexes = File::open(&src_indexes)?;
let mut indexes = BufReader::new(indexes);

View file

@ -6,14 +6,14 @@ use std::convert::{TryFrom, TryInto};
use std::path::Path;
use std::sync::Arc;
use chrono::Utc;
use error::{IndexResolverError, Result};
use heed::Env;
use index_store::{IndexStore, MapIndexStore};
use meilisearch_error::ResponseError;
use meta_store::{HeedMetaStore, IndexMetaStore};
use milli::heed::Env;
use milli::update::{DocumentDeletionResult, IndexerConfig};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::oneshot;
use tokio::task::spawn_blocking;
use uuid::Uuid;
@ -39,7 +39,7 @@ pub fn create_index_resolver(
path: impl AsRef<Path>,
index_size: usize,
indexer_opts: &IndexerOpts,
meta_env: Arc<heed::Env>,
meta_env: Arc<milli::heed::Env>,
file_store: UpdateFileStore,
) -> anyhow::Result<HardStateIndexResolver> {
let uuid_store = HeedMetaStore::new(meta_env)?;
@ -115,18 +115,19 @@ where
self.process_document_addition_batch(batch).await
} else {
if let Some(task) = batch.tasks.first_mut() {
task.events.push(TaskEvent::Processing(Utc::now()));
task.events
.push(TaskEvent::Processing(OffsetDateTime::now_utc()));
match self.process_task(task).await {
Ok(success) => {
task.events.push(TaskEvent::Succeded {
result: success,
timestamp: Utc::now(),
timestamp: OffsetDateTime::now_utc(),
});
}
Err(err) => task.events.push(TaskEvent::Failed {
error: err.into(),
timestamp: Utc::now(),
timestamp: OffsetDateTime::now_utc(),
}),
}
}
@ -225,7 +226,7 @@ where
// If the index doesn't exist and we are not allowed to create it with the first
// task, we must fails the whole batch.
let now = Utc::now();
let now = OffsetDateTime::now_utc();
let index = match index {
Ok(index) => index,
Err(e) => {
@ -253,17 +254,17 @@ where
let event = match result {
Ok(Ok(result)) => TaskEvent::Succeded {
timestamp: Utc::now(),
timestamp: OffsetDateTime::now_utc(),
result: TaskResult::DocumentAddition {
indexed_documents: result.indexed_documents,
},
},
Ok(Err(e)) => TaskEvent::Failed {
timestamp: Utc::now(),
timestamp: OffsetDateTime::now_utc(),
error: e.into(),
},
Err(e) => TaskEvent::Failed {
timestamp: Utc::now(),
timestamp: OffsetDateTime::now_utc(),
error: IndexResolverError::from(e).into(),
},
};
@ -524,7 +525,7 @@ mod test {
};
if primary_key.is_some() {
mocker.when::<String, IndexResult<IndexMeta>>("update_primary_key")
.then(move |_| Ok(IndexMeta{ created_at: Utc::now(), updated_at: Utc::now(), primary_key: None }));
.then(move |_| Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None }));
}
mocker.when::<(IndexDocumentsMethod, Option<String>, UpdateFileStore, IntoIter<Uuid>), IndexResult<DocumentAdditionResult>>("update_documents")
.then(move |(_, _, _, _)| result());
@ -569,7 +570,7 @@ mod test {
| TaskContent::IndexCreation { primary_key } => {
if primary_key.is_some() {
let result = move || if !index_op_fails {
Ok(IndexMeta{ created_at: Utc::now(), updated_at: Utc::now(), primary_key: None })
Ok(IndexMeta{ created_at: OffsetDateTime::now_utc(), updated_at: OffsetDateTime::now_utc(), primary_key: None })
} else {
// return this error because it's easy to generate...
Err(IndexError::DocumentNotFound("a doc".into()))
@ -640,7 +641,7 @@ mod test {
let update_file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store);
let batch = Batch { id: 1, created_at: Utc::now(), tasks: vec![task.clone()] };
let batch = Batch { id: 1, created_at: OffsetDateTime::now_utc(), tasks: vec![task.clone()] };
let result = index_resolver.process_batch(batch).await;
// Test for some expected output scenarios:

View file

@ -13,8 +13,8 @@ mod update_file_store;
use std::path::Path;
pub use index_controller::MeiliSearch;
pub use milli;
pub use milli::heed;
mod compression;
pub mod document_formats;
@ -25,7 +25,7 @@ pub trait EnvSizer {
fn size(&self) -> u64;
}
impl EnvSizer for heed::Env {
impl EnvSizer for milli::heed::Env {
fn size(&self) -> u64 {
WalkDir::new(self.path())
.into_iter()

View file

@ -48,24 +48,24 @@ pub struct IndexerOpts {
pub struct SchedulerConfig {
/// enable the autobatching experimental feature
#[clap(long, hide = true)]
pub enable_autobatching: bool,
pub enable_auto_batching: bool,
// The maximum number of updates of the same type that can be batched together.
// If unspecified, this is unlimited. A value of 0 is interpreted as 1.
#[clap(long, requires = "enable-autobatching", hide = true)]
#[clap(long, requires = "enable-auto-batching", hide = true)]
pub max_batch_size: Option<usize>,
// The maximum number of documents in a document batch. Since batches must contain at least one
// update for the scheduler to make progress, the number of documents in a batch will be at
// least the number of documents of its first update.
#[clap(long, requires = "enable-autobatching", hide = true)]
#[clap(long, requires = "enable-auto-batching", hide = true)]
pub max_documents_per_batch: Option<usize>,
/// Debounce duration in seconds
///
/// When a new task is enqueued, the scheduler waits for `debounce_duration_sec` seconds for new updates before
/// starting to process a batch of updates.
#[clap(long, requires = "enable-autobatching", hide = true)]
#[clap(long, requires = "enable-auto-batching", hide = true)]
pub debounce_duration_sec: Option<u64>,
}

View file

@ -149,7 +149,7 @@ impl SnapshotJob {
let env = open_meta_env(&self.src_path, self.meta_env_size)?;
let dst = path.join("data.mdb");
env.copy_to_path(dst, heed::CompactionOption::Enabled)?;
env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
Ok(())
}
@ -180,12 +180,12 @@ impl SnapshotJob {
let dst = dst.join("data.mdb");
let mut options = heed::EnvOpenOptions::new();
let mut options = milli::heed::EnvOpenOptions::new();
options.map_size(self.index_size);
let index = milli::Index::new(options, entry.path())?;
index
.env
.copy_to_path(dst, heed::CompactionOption::Enabled)?;
.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
}
Ok(())
@ -198,7 +198,7 @@ impl SnapshotJob {
let dst = dst.join("data.mdb");
let env = open_auth_store_env(&auth_path)?;
env.copy_to_path(dst, heed::CompactionOption::Enabled)?;
env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
Ok(())
}

View file

@ -1,4 +1,4 @@
use chrono::{DateTime, Utc};
use time::OffsetDateTime;
use super::task::Task;
@ -7,7 +7,7 @@ pub type BatchId = u64;
#[derive(Debug)]
pub struct Batch {
pub id: BatchId,
pub created_at: DateTime<Utc>,
pub created_at: OffsetDateTime,
pub tasks: Vec<Task>,
}

View file

@ -16,7 +16,7 @@ pub enum TaskError {
}
internal_error!(
TaskError: heed::Error,
TaskError: milli::heed::Error,
JoinError,
std::io::Error,
serde_json::Error,

View file

@ -6,8 +6,8 @@ use std::sync::Arc;
use std::time::Duration;
use atomic_refcell::AtomicRefCell;
use chrono::Utc;
use milli::update::IndexDocumentsMethod;
use time::OffsetDateTime;
use tokio::sync::{watch, RwLock};
use crate::options::SchedulerConfig;
@ -218,7 +218,7 @@ impl Scheduler {
let debounce_time = config.debounce_duration_sec;
// Disable autobatching
if !config.enable_autobatching {
if !config.enable_auto_batching {
config.max_batch_size = Some(1);
}
@ -357,7 +357,7 @@ impl Scheduler {
tasks.iter_mut().for_each(|t| {
t.events.push(TaskEvent::Batched {
batch_id: id,
timestamp: Utc::now(),
timestamp: OffsetDateTime::now_utc(),
})
});
@ -365,7 +365,7 @@ impl Scheduler {
let batch = Batch {
id,
created_at: Utc::now(),
created_at: OffsetDateTime::now_utc(),
tasks,
};

View file

@ -1,9 +1,9 @@
use std::path::PathBuf;
use chrono::{DateTime, Utc};
use meilisearch_error::ResponseError;
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::oneshot;
use uuid::Uuid;
@ -36,22 +36,33 @@ impl From<DocumentAdditionResult> for TaskResult {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub enum TaskEvent {
Created(#[cfg_attr(test, proptest(strategy = "test::datetime_strategy()"))] DateTime<Utc>),
Created(
#[cfg_attr(test, proptest(strategy = "test::datetime_strategy()"))]
#[serde(with = "time::serde::rfc3339")]
OffsetDateTime,
),
Batched {
#[cfg_attr(test, proptest(strategy = "test::datetime_strategy()"))]
timestamp: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
batch_id: BatchId,
},
Processing(#[cfg_attr(test, proptest(strategy = "test::datetime_strategy()"))] DateTime<Utc>),
Processing(
#[cfg_attr(test, proptest(strategy = "test::datetime_strategy()"))]
#[serde(with = "time::serde::rfc3339")]
OffsetDateTime,
),
Succeded {
result: TaskResult,
#[cfg_attr(test, proptest(strategy = "test::datetime_strategy()"))]
timestamp: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
},
Failed {
error: ResponseError,
#[cfg_attr(test, proptest(strategy = "test::datetime_strategy()"))]
timestamp: DateTime<Utc>,
#[serde(with = "time::serde::rfc3339")]
timestamp: OffsetDateTime,
},
}
@ -165,7 +176,7 @@ mod test {
]
}
pub(super) fn datetime_strategy() -> impl Strategy<Value = DateTime<Utc>> {
Just(Utc::now())
pub(super) fn datetime_strategy() -> impl Strategy<Value = OffsetDateTime> {
Just(OffsetDateTime::now_utc())
}
}

View file

@ -5,9 +5,9 @@ use std::io::{BufWriter, Write};
use std::path::Path;
use std::sync::Arc;
use chrono::Utc;
use heed::{Env, RwTxn};
use log::debug;
use milli::heed::{Env, RwTxn};
use time::OffsetDateTime;
use super::error::TaskError;
use super::task::{Task, TaskContent, TaskId};
@ -61,7 +61,7 @@ impl Clone for TaskStore {
}
impl TaskStore {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
let store = Arc::new(Store::new(env)?);
Ok(Self { store })
}
@ -72,7 +72,7 @@ impl TaskStore {
let task = tokio::task::spawn_blocking(move || -> Result<Task> {
let mut txn = store.wtxn()?;
let next_task_id = store.next_task_id(&mut txn)?;
let created_at = TaskEvent::Created(Utc::now());
let created_at = TaskEvent::Created(OffsetDateTime::now_utc());
let task = Task {
id: next_task_id,
index_uid,
@ -248,7 +248,7 @@ pub mod test {
}
impl MockTaskStore {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
Ok(Self::Real(TaskStore::new(env)?))
}

View file

@ -1,5 +1,5 @@
#[allow(clippy::upper_case_acronyms)]
type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
type BEU64 = milli::heed::zerocopy::U64<milli::heed::byteorder::BE>;
const UID_TASK_IDS: &str = "uid_task_id";
const TASKS: &str = "tasks";
@ -12,8 +12,8 @@ use std::ops::Range;
use std::result::Result as StdResult;
use std::sync::Arc;
use heed::types::{ByteSlice, OwnedType, SerdeJson, Unit};
use heed::{BytesDecode, BytesEncode, Database, Env, RoTxn, RwTxn};
use milli::heed::types::{ByteSlice, OwnedType, SerdeJson, Unit};
use milli::heed::{BytesDecode, BytesEncode, Database, Env, RoTxn, RwTxn};
use crate::tasks::task::{Task, TaskId};
@ -73,7 +73,7 @@ impl Store {
/// be in an invalid state, with dangling processing tasks.
/// You want to patch all un-finished tasks and put them in your pending
/// queue with the `reset_and_return_unfinished_update` method.
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
let uids_task_ids = env.create_database(Some(UID_TASK_IDS))?;
let tasks = env.create_database(Some(TASKS))?;
@ -130,7 +130,7 @@ impl Store {
let range = from..limit
.map(|limit| (limit as u64).saturating_add(from))
.unwrap_or(u64::MAX);
let iter: Box<dyn Iterator<Item = StdResult<_, heed::Error>>> = match filter {
let iter: Box<dyn Iterator<Item = StdResult<_, milli::heed::Error>>> = match filter {
Some(
ref filter @ TaskFilter {
indexes: Some(_), ..
@ -150,7 +150,7 @@ impl Store {
),
};
let apply_fitler = |task: &StdResult<_, heed::Error>| match task {
let apply_fitler = |task: &StdResult<_, milli::heed::Error>| match task {
Ok(ref t) => filter
.as_ref()
.and_then(|filter| filter.filter_fn.as_ref())
@ -162,7 +162,7 @@ impl Store {
let tasks = iter
.filter(apply_fitler)
.take(limit.unwrap_or(usize::MAX))
.try_fold::<_, _, StdResult<_, heed::Error>>(Vec::new(), |mut v, task| {
.try_fold::<_, _, StdResult<_, milli::heed::Error>>(Vec::new(), |mut v, task| {
v.push(task?);
Ok(v)
})?;
@ -172,7 +172,7 @@ impl Store {
fn compute_candidates(
&self,
txn: &heed::RoTxn,
txn: &milli::heed::RoTxn,
filter: &TaskFilter,
range: Range<TaskId>,
) -> Result<BinaryHeap<TaskId>> {
@ -188,10 +188,10 @@ impl Store {
self.uids_task_ids
.remap_key_type::<ByteSlice>()
.rev_prefix_iter(txn, &index_uid)?
.map(|entry| -> StdResult<_, heed::Error> {
.map(|entry| -> StdResult<_, milli::heed::Error> {
let (key, _) = entry?;
let (_, id) =
IndexUidTaskIdCodec::bytes_decode(key).ok_or(heed::Error::Decoding)?;
let (_, id) = IndexUidTaskIdCodec::bytes_decode(key)
.ok_or(milli::heed::Error::Decoding)?;
Ok(id)
})
.skip_while(|entry| {
@ -212,7 +212,7 @@ impl Store {
// if we encounter an error we returns true to collect it later
.unwrap_or(true)
})
.try_for_each::<_, StdResult<(), heed::Error>>(|id| {
.try_for_each::<_, StdResult<(), milli::heed::Error>>(|id| {
candidates.push(id?);
Ok(())
})?;
@ -225,8 +225,8 @@ impl Store {
#[cfg(test)]
pub mod test {
use heed::EnvOpenOptions;
use itertools::Itertools;
use milli::heed::EnvOpenOptions;
use nelson::Mocker;
use proptest::collection::vec;
use proptest::prelude::*;
@ -244,10 +244,10 @@ pub mod test {
Fake(Mocker),
}
pub struct TmpEnv(TempDir, Arc<heed::Env>);
pub struct TmpEnv(TempDir, Arc<milli::heed::Env>);
impl TmpEnv {
pub fn env(&self) -> Arc<heed::Env> {
pub fn env(&self) -> Arc<milli::heed::Env> {
self.1.clone()
}
}
@ -264,7 +264,7 @@ pub mod test {
}
impl MockStore {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
Ok(Self::Real(Store::new(env)?))
}

View file

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use time::OffsetDateTime;
use tokio::sync::{watch, RwLock};
use tokio::time::interval_at;
@ -63,7 +63,8 @@ where
match pending {
Pending::Batch(mut batch) => {
for task in &mut batch.tasks {
task.events.push(TaskEvent::Processing(Utc::now()));
task.events
.push(TaskEvent::Processing(OffsetDateTime::now_utc()));
}
batch.tasks = {