mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-25 22:34:28 +01:00
I can index documents without meilisearch
This commit is contained in:
parent
edd8344dc9
commit
8770e07397
14
Cargo.lock
generated
14
Cargo.lock
generated
@ -1077,6 +1077,18 @@ dependencies = [
|
|||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "document-formats"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"csv",
|
||||||
|
"either",
|
||||||
|
"meilisearch-types",
|
||||||
|
"milli 0.33.0",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "downcast"
|
name = "downcast"
|
||||||
version = "0.11.0"
|
version = "0.11.0"
|
||||||
@ -1784,10 +1796,12 @@ dependencies = [
|
|||||||
"big_s",
|
"big_s",
|
||||||
"bincode",
|
"bincode",
|
||||||
"csv",
|
"csv",
|
||||||
|
"document-formats",
|
||||||
"file-store",
|
"file-store",
|
||||||
"index",
|
"index",
|
||||||
"insta",
|
"insta",
|
||||||
"log",
|
"log",
|
||||||
|
"meilisearch-types",
|
||||||
"milli 0.33.0",
|
"milli 0.33.0",
|
||||||
"nelson",
|
"nelson",
|
||||||
"roaring 0.9.0",
|
"roaring 0.9.0",
|
||||||
|
@ -6,6 +6,7 @@ members = [
|
|||||||
"meilisearch-lib",
|
"meilisearch-lib",
|
||||||
"meilisearch-auth",
|
"meilisearch-auth",
|
||||||
"index-scheduler",
|
"index-scheduler",
|
||||||
|
"document-formats",
|
||||||
"index",
|
"index",
|
||||||
"file-store",
|
"file-store",
|
||||||
"permissive-json-pointer",
|
"permissive-json-pointer",
|
||||||
|
14
document-formats/Cargo.toml
Normal file
14
document-formats/Cargo.toml
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
[package]
|
||||||
|
name = "document-formats"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
csv = "1.1.6"
|
||||||
|
meilisearch-types = { path = "../meilisearch-types" }
|
||||||
|
either = { version = "1.6.1", features = ["serde"] }
|
||||||
|
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.33.0" }
|
||||||
|
serde_json = { version = "1.0.85", features = ["preserve_order"] }
|
||||||
|
serde = { version = "1.0.136", features = ["derive"] }
|
155
document-formats/src/lib.rs
Normal file
155
document-formats/src/lib.rs
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
use std::borrow::Borrow;
|
||||||
|
use std::fmt::{self, Debug, Display};
|
||||||
|
use std::io::{self, BufReader, Read, Seek, Write};
|
||||||
|
|
||||||
|
use either::Either;
|
||||||
|
use meilisearch_types::error::{Code, ErrorCode};
|
||||||
|
use meilisearch_types::internal_error;
|
||||||
|
use milli::documents::{DocumentsBatchBuilder, Error};
|
||||||
|
use milli::Object;
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
type Result<T> = std::result::Result<T, DocumentFormatError>;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum PayloadType {
|
||||||
|
Ndjson,
|
||||||
|
Json,
|
||||||
|
Csv,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for PayloadType {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
PayloadType::Ndjson => f.write_str("ndjson"),
|
||||||
|
PayloadType::Json => f.write_str("json"),
|
||||||
|
PayloadType::Csv => f.write_str("csv"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DocumentFormatError {
|
||||||
|
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
|
||||||
|
MalformedPayload(Error, PayloadType),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for DocumentFormatError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e),
|
||||||
|
Self::MalformedPayload(me, b) => match me.borrow() {
|
||||||
|
Error::Json(se) => {
|
||||||
|
// https://github.com/meilisearch/meilisearch/issues/2107
|
||||||
|
// The user input maybe insanely long. We need to truncate it.
|
||||||
|
let mut serde_msg = se.to_string();
|
||||||
|
let ellipsis = "...";
|
||||||
|
if serde_msg.len() > 100 + ellipsis.len() {
|
||||||
|
serde_msg.replace_range(50..serde_msg.len() - 85, ellipsis);
|
||||||
|
}
|
||||||
|
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"The `{}` payload provided is malformed. `Couldn't serialize document value: {}`.",
|
||||||
|
b, serde_msg
|
||||||
|
)
|
||||||
|
}
|
||||||
|
_ => write!(f, "The `{}` payload provided is malformed: `{}`.", b, me),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for DocumentFormatError {}
|
||||||
|
|
||||||
|
impl From<(PayloadType, Error)> for DocumentFormatError {
|
||||||
|
fn from((ty, error): (PayloadType, Error)) -> Self {
|
||||||
|
match error {
|
||||||
|
Error::Io(e) => Self::Internal(Box::new(e)),
|
||||||
|
e => Self::MalformedPayload(e, ty),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ErrorCode for DocumentFormatError {
|
||||||
|
fn error_code(&self) -> Code {
|
||||||
|
match self {
|
||||||
|
DocumentFormatError::Internal(_) => Code::Internal,
|
||||||
|
DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal_error!(DocumentFormatError: io::Error);
|
||||||
|
|
||||||
|
/// Reads CSV from input and write an obkv batch to writer.
|
||||||
|
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
||||||
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||||
|
|
||||||
|
let csv = csv::Reader::from_reader(input);
|
||||||
|
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
|
||||||
|
|
||||||
|
let count = builder.documents_count();
|
||||||
|
let _ = builder
|
||||||
|
.into_inner()
|
||||||
|
.map_err(Into::into)
|
||||||
|
.map_err(DocumentFormatError::Internal)?;
|
||||||
|
|
||||||
|
Ok(count as usize)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reads JSON Lines from input and write an obkv batch to writer.
|
||||||
|
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
||||||
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||||
|
let reader = BufReader::new(input);
|
||||||
|
|
||||||
|
for result in serde_json::Deserializer::from_reader(reader).into_iter() {
|
||||||
|
let object = result
|
||||||
|
.map_err(Error::Json)
|
||||||
|
.map_err(|e| (PayloadType::Ndjson, e))?;
|
||||||
|
builder
|
||||||
|
.append_json_object(&object)
|
||||||
|
.map_err(Into::into)
|
||||||
|
.map_err(DocumentFormatError::Internal)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let count = builder.documents_count();
|
||||||
|
let _ = builder
|
||||||
|
.into_inner()
|
||||||
|
.map_err(Into::into)
|
||||||
|
.map_err(DocumentFormatError::Internal)?;
|
||||||
|
|
||||||
|
Ok(count as usize)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reads JSON from input and write an obkv batch to writer.
|
||||||
|
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
||||||
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||||
|
let reader = BufReader::new(input);
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug)]
|
||||||
|
#[serde(transparent)]
|
||||||
|
struct ArrayOrSingleObject {
|
||||||
|
#[serde(with = "either::serde_untagged")]
|
||||||
|
inner: Either<Vec<Object>, Object>,
|
||||||
|
}
|
||||||
|
|
||||||
|
let content: ArrayOrSingleObject = serde_json::from_reader(reader)
|
||||||
|
.map_err(Error::Json)
|
||||||
|
.map_err(|e| (PayloadType::Json, e))?;
|
||||||
|
|
||||||
|
for object in content.inner.map_right(|o| vec![o]).into_inner() {
|
||||||
|
builder
|
||||||
|
.append_json_object(&object)
|
||||||
|
.map_err(Into::into)
|
||||||
|
.map_err(DocumentFormatError::Internal)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let count = builder.documents_count();
|
||||||
|
let _ = builder
|
||||||
|
.into_inner()
|
||||||
|
.map_err(Into::into)
|
||||||
|
.map_err(DocumentFormatError::Internal)?;
|
||||||
|
|
||||||
|
Ok(count as usize)
|
||||||
|
}
|
@ -40,7 +40,7 @@ pub struct FileStore {
|
|||||||
#[cfg(not(test))]
|
#[cfg(not(test))]
|
||||||
impl FileStore {
|
impl FileStore {
|
||||||
pub fn new(path: impl AsRef<Path>) -> Result<FileStore> {
|
pub fn new(path: impl AsRef<Path>) -> Result<FileStore> {
|
||||||
let path = path.as_ref().join(UPDATE_FILES_PATH);
|
let path = path.as_ref().to_path_buf();
|
||||||
std::fs::create_dir_all(&path)?;
|
std::fs::create_dir_all(&path)?;
|
||||||
Ok(FileStore { path })
|
Ok(FileStore { path })
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,8 @@ file-store = { path = "../file-store" }
|
|||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.33.0" }
|
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.33.0" }
|
||||||
index = { path = "../index" }
|
index = { path = "../index" }
|
||||||
|
meilisearch-types = { path = "../meilisearch-types" }
|
||||||
|
document-formats = { path = "../document-formats" }
|
||||||
roaring = "0.9.0"
|
roaring = "0.9.0"
|
||||||
serde = { version = "1.0.136", features = ["derive"] }
|
serde = { version = "1.0.136", features = ["derive"] }
|
||||||
tempfile = "3.3.0"
|
tempfile = "3.3.0"
|
||||||
|
@ -459,13 +459,12 @@ impl IndexScheduler {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
// TODO: TAMO: find a way to convert all errors to the `Task::Error` type
|
task.error = Some(error.into());
|
||||||
// task.error = Some(error);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
todo!()
|
Ok(tasks)
|
||||||
}
|
}
|
||||||
Batch::SettingsAndDocumentAddition {
|
Batch::SettingsAndDocumentAddition {
|
||||||
index_uid,
|
index_uid,
|
||||||
|
@ -54,6 +54,7 @@ impl IndexMapper {
|
|||||||
Ok(index) => index,
|
Ok(index) => index,
|
||||||
Err(Error::IndexNotFound(_)) => {
|
Err(Error::IndexNotFound(_)) => {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
|
self.index_mapping.put(wtxn, name, &uuid)?;
|
||||||
Index::open(
|
Index::open(
|
||||||
self.base_path.join(uuid.to_string()),
|
self.base_path.join(uuid.to_string()),
|
||||||
name.to_string(),
|
name.to_string(),
|
||||||
|
@ -91,11 +91,12 @@ impl IndexScheduler {
|
|||||||
let wake_up = SignalEvent::auto(true);
|
let wake_up = SignalEvent::auto(true);
|
||||||
|
|
||||||
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new());
|
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new());
|
||||||
|
let file_store = FileStore::new(&update_file_path)?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
// by default there is no processing tasks
|
// by default there is no processing tasks
|
||||||
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
|
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
|
||||||
file_store: FileStore::new(update_file_path)?,
|
file_store,
|
||||||
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
|
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
|
||||||
status: env.create_database(Some(db_name::STATUS))?,
|
status: env.create_database(Some(db_name::STATUS))?,
|
||||||
kind: env.create_database(Some(db_name::KIND))?,
|
kind: env.create_database(Some(db_name::KIND))?,
|
||||||
@ -274,166 +275,6 @@ impl IndexScheduler {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(truc)]
|
|
||||||
fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> {
|
|
||||||
match batch {
|
|
||||||
Batch::One(task) => match &task.kind {
|
|
||||||
KindWithContent::ClearAllDocuments { index_name } => {
|
|
||||||
self.index(&index_name)?.clear_documents()?;
|
|
||||||
}
|
|
||||||
KindWithContent::RenameIndex {
|
|
||||||
index_name: _,
|
|
||||||
new_name,
|
|
||||||
} => {
|
|
||||||
if self.available_index.get(wtxn, &new_name)?.unwrap_or(false) {
|
|
||||||
return Err(Error::IndexAlreadyExists(new_name.to_string()));
|
|
||||||
}
|
|
||||||
todo!("wait for @guigui insight");
|
|
||||||
}
|
|
||||||
KindWithContent::CreateIndex {
|
|
||||||
index_name,
|
|
||||||
primary_key,
|
|
||||||
} => {
|
|
||||||
if self
|
|
||||||
.available_index
|
|
||||||
.get(wtxn, &index_name)?
|
|
||||||
.unwrap_or(false)
|
|
||||||
{
|
|
||||||
return Err(Error::IndexAlreadyExists(index_name.to_string()));
|
|
||||||
}
|
|
||||||
|
|
||||||
self.available_index.put(wtxn, &index_name, &true)?;
|
|
||||||
// TODO: TAMO: give real info to the index
|
|
||||||
let index = Index::open(
|
|
||||||
index_name.to_string(),
|
|
||||||
index_name.to_string(),
|
|
||||||
100_000_000,
|
|
||||||
Arc::default(),
|
|
||||||
)?;
|
|
||||||
if let Some(primary_key) = primary_key {
|
|
||||||
index.update_primary_key(primary_key.to_string())?;
|
|
||||||
}
|
|
||||||
self.index_map
|
|
||||||
.write()
|
|
||||||
.map_err(|_| Error::CorruptedTaskQueue)?
|
|
||||||
.insert(index_name.to_string(), index.clone());
|
|
||||||
}
|
|
||||||
KindWithContent::DeleteIndex { index_name } => {
|
|
||||||
if !self.available_index.delete(wtxn, &index_name)? {
|
|
||||||
return Err(Error::IndexNotFound(index_name.to_string()));
|
|
||||||
}
|
|
||||||
if let Some(index) = self
|
|
||||||
.index_map
|
|
||||||
.write()
|
|
||||||
.map_err(|_| Error::CorruptedTaskQueue)?
|
|
||||||
.remove(index_name)
|
|
||||||
{
|
|
||||||
index.delete()?;
|
|
||||||
} else {
|
|
||||||
// TODO: TAMO: fix the path
|
|
||||||
std::fs::remove_file(index_name)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
KindWithContent::SwapIndex { lhs, rhs } => {
|
|
||||||
if !self.available_index.get(wtxn, &lhs)?.unwrap_or(false) {
|
|
||||||
return Err(Error::IndexNotFound(lhs.to_string()));
|
|
||||||
}
|
|
||||||
if !self.available_index.get(wtxn, &rhs)?.unwrap_or(false) {
|
|
||||||
return Err(Error::IndexNotFound(rhs.to_string()));
|
|
||||||
}
|
|
||||||
|
|
||||||
let lhs_bitmap = self.index_tasks.get(wtxn, lhs)?;
|
|
||||||
let rhs_bitmap = self.index_tasks.get(wtxn, rhs)?;
|
|
||||||
// the bitmap are lazily created and thus may not exists.
|
|
||||||
if let Some(bitmap) = rhs_bitmap {
|
|
||||||
self.index_tasks.put(wtxn, lhs, &bitmap)?;
|
|
||||||
}
|
|
||||||
if let Some(bitmap) = lhs_bitmap {
|
|
||||||
self.index_tasks.put(wtxn, rhs, &bitmap)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut index_map = self
|
|
||||||
.index_map
|
|
||||||
.write()
|
|
||||||
.map_err(|_| Error::CorruptedTaskQueue)?;
|
|
||||||
|
|
||||||
let lhs_index = index_map.remove(lhs).unwrap();
|
|
||||||
let rhs_index = index_map.remove(rhs).unwrap();
|
|
||||||
|
|
||||||
index_map.insert(lhs.to_string(), rhs_index);
|
|
||||||
index_map.insert(rhs.to_string(), lhs_index);
|
|
||||||
}
|
|
||||||
_ => unreachable!(),
|
|
||||||
},
|
|
||||||
Batch::Cancel(_) => todo!(),
|
|
||||||
Batch::Snapshot(_) => todo!(),
|
|
||||||
Batch::Dump(_) => todo!(),
|
|
||||||
Batch::Contiguous { tasks, kind } => {
|
|
||||||
// it's safe because you can't batch 0 contiguous tasks.
|
|
||||||
let first_task = &tasks[0];
|
|
||||||
// and the two kind of tasks we batch MUST have ONE index name.
|
|
||||||
let index_name = first_task.indexes().unwrap()[0];
|
|
||||||
let index = self.index(index_name)?;
|
|
||||||
|
|
||||||
match kind {
|
|
||||||
Kind::DocumentAddition => {
|
|
||||||
let content_files = tasks.iter().map(|task| match &task.kind {
|
|
||||||
KindWithContent::DocumentAddition { content_file, .. } => {
|
|
||||||
content_file.clone()
|
|
||||||
}
|
|
||||||
k => unreachable!(
|
|
||||||
"Internal error, `{:?}` is not supposed to be reachable here",
|
|
||||||
k.as_kind()
|
|
||||||
),
|
|
||||||
});
|
|
||||||
let results = index.update_documents(
|
|
||||||
IndexDocumentsMethod::UpdateDocuments,
|
|
||||||
None,
|
|
||||||
self.file_store.clone(),
|
|
||||||
content_files,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
for (task, result) in tasks.iter_mut().zip(results) {
|
|
||||||
task.finished_at = Some(OffsetDateTime::now_utc());
|
|
||||||
match result {
|
|
||||||
Ok(_) => task.status = Status::Succeeded,
|
|
||||||
Err(_) => task.status = Status::Succeeded,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Kind::DocumentDeletion => {
|
|
||||||
let ids: Vec<_> = tasks
|
|
||||||
.iter()
|
|
||||||
.flat_map(|task| match &task.kind {
|
|
||||||
KindWithContent::DocumentDeletion { documents_ids, .. } => {
|
|
||||||
documents_ids.clone()
|
|
||||||
}
|
|
||||||
k => unreachable!(
|
|
||||||
"Internal error, `{:?}` is not supposed to be reachable here",
|
|
||||||
k.as_kind()
|
|
||||||
),
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let result = index.delete_documents(&ids);
|
|
||||||
|
|
||||||
for task in tasks.iter_mut() {
|
|
||||||
task.finished_at = Some(OffsetDateTime::now_utc());
|
|
||||||
match result {
|
|
||||||
Ok(_) => task.status = Status::Succeeded,
|
|
||||||
Err(_) => task.status = Status::Succeeded,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Batch::Empty => todo!(),
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Notify the scheduler there is or may be work to do.
|
/// Notify the scheduler there is or may be work to do.
|
||||||
pub fn notify(&self) {
|
pub fn notify(&self) {
|
||||||
self.wake_up.signal()
|
self.wake_up.signal()
|
||||||
@ -443,34 +284,15 @@ impl IndexScheduler {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
use insta::assert_debug_snapshot;
|
|
||||||
use tempfile::TempDir;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::assert_smol_debug_snapshot;
|
use crate::{assert_smol_debug_snapshot, tests::index_scheduler};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
fn new() -> IndexScheduler {
|
|
||||||
let dir = TempDir::new().unwrap();
|
|
||||||
IndexScheduler::new(
|
|
||||||
dir.path().join("db_path"),
|
|
||||||
dir.path().join("file_store"),
|
|
||||||
dir.path().join("indexes"),
|
|
||||||
100_000_000,
|
|
||||||
IndexerConfig::default(),
|
|
||||||
)
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn simple_new() {
|
|
||||||
new();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn register() {
|
fn register() {
|
||||||
let index_scheduler = new();
|
let (index_scheduler, _) = index_scheduler();
|
||||||
let kinds = [
|
let kinds = [
|
||||||
KindWithContent::IndexCreation {
|
KindWithContent::IndexCreation {
|
||||||
index_uid: S("catto"),
|
index_uid: S("catto"),
|
||||||
@ -541,4 +363,42 @@ mod tests {
|
|||||||
|
|
||||||
assert_smol_debug_snapshot!(index_tasks, @r###"[("catto", RoaringBitmap<[0, 1, 3]>), ("doggo", RoaringBitmap<[4]>)]"###);
|
assert_smol_debug_snapshot!(index_tasks, @r###"[("catto", RoaringBitmap<[0, 1, 3]>), ("doggo", RoaringBitmap<[4]>)]"###);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn document_addition() {
|
||||||
|
let (index_scheduler, _dir) = index_scheduler();
|
||||||
|
|
||||||
|
let content = r#"
|
||||||
|
{
|
||||||
|
"id": 1,
|
||||||
|
"doggo": "bob"
|
||||||
|
}"#;
|
||||||
|
|
||||||
|
let (uuid, mut file) = index_scheduler.file_store.new_update().unwrap();
|
||||||
|
document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap();
|
||||||
|
file.persist().unwrap();
|
||||||
|
|
||||||
|
index_scheduler
|
||||||
|
.register(KindWithContent::DocumentAddition {
|
||||||
|
index_uid: S("doggos"),
|
||||||
|
primary_key: Some(S("id")),
|
||||||
|
content_file: uuid,
|
||||||
|
documents_count: 100,
|
||||||
|
allow_index_creation: true,
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
index_scheduler.tick().unwrap();
|
||||||
|
|
||||||
|
let doggos = index_scheduler.index("doggos").unwrap();
|
||||||
|
|
||||||
|
let rtxn = doggos.read_txn().unwrap();
|
||||||
|
let documents: Vec<_> = doggos
|
||||||
|
.all_documents(&rtxn)
|
||||||
|
.unwrap()
|
||||||
|
.collect::<std::result::Result<_, _>>()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_smol_debug_snapshot!(documents, @r###"[{"id": Number(1), "doggo": String("bob")}]"###);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,11 @@ pub use task::TaskView as Task;
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use milli::update::IndexerConfig;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
use crate::IndexScheduler;
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! assert_smol_debug_snapshot {
|
macro_rules! assert_smol_debug_snapshot {
|
||||||
($value:expr, @$snapshot:literal) => {{
|
($value:expr, @$snapshot:literal) => {{
|
||||||
@ -32,5 +37,26 @@ mod tests {
|
|||||||
let value = format!("{:?}", $value);
|
let value = format!("{:?}", $value);
|
||||||
insta::assert_snapshot!($crate::_macro_support::AutoName, value, stringify!($value));
|
insta::assert_snapshot!($crate::_macro_support::AutoName, value, stringify!($value));
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn index_scheduler() -> (IndexScheduler, TempDir) {
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
|
||||||
|
(
|
||||||
|
IndexScheduler::new(
|
||||||
|
dir.path().join("db_path"),
|
||||||
|
dir.path().join("file_store"),
|
||||||
|
dir.path().join("indexes"),
|
||||||
|
1024 * 1024,
|
||||||
|
IndexerConfig::default(),
|
||||||
|
)
|
||||||
|
.unwrap(),
|
||||||
|
dir,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn simple_new() {
|
||||||
|
index_scheduler();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use index::{Settings, Unchecked};
|
use index::{Settings, Unchecked};
|
||||||
|
use meilisearch_types::error::ResponseError;
|
||||||
|
|
||||||
use milli::DocumentId;
|
use milli::DocumentId;
|
||||||
use serde::{Deserialize, Serialize, Serializer};
|
use serde::{Deserialize, Serialize, Serializer};
|
||||||
@ -18,16 +19,6 @@ pub enum Status {
|
|||||||
Failed,
|
Failed,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct Error {
|
|
||||||
message: String,
|
|
||||||
code: String,
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
kind: String,
|
|
||||||
link: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct TaskView {
|
pub struct TaskView {
|
||||||
@ -38,7 +29,7 @@ pub struct TaskView {
|
|||||||
pub kind: Kind,
|
pub kind: Kind,
|
||||||
|
|
||||||
pub details: Option<Details>,
|
pub details: Option<Details>,
|
||||||
pub error: Option<Error>,
|
pub error: Option<ResponseError>,
|
||||||
|
|
||||||
#[serde(serialize_with = "serialize_duration")]
|
#[serde(serialize_with = "serialize_duration")]
|
||||||
pub duration: Option<Duration>,
|
pub duration: Option<Duration>,
|
||||||
@ -62,7 +53,7 @@ pub struct Task {
|
|||||||
#[serde(with = "time::serde::rfc3339::option")]
|
#[serde(with = "time::serde::rfc3339::option")]
|
||||||
pub finished_at: Option<OffsetDateTime>,
|
pub finished_at: Option<OffsetDateTime>,
|
||||||
|
|
||||||
pub error: Option<Error>,
|
pub error: Option<ResponseError>,
|
||||||
pub details: Option<Details>,
|
pub details: Option<Details>,
|
||||||
|
|
||||||
pub status: Status,
|
pub status: Status,
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
use meilisearch_types::error::{Code, ErrorCode};
|
use meilisearch_types::error::{Code, ErrorCode};
|
||||||
use meilisearch_types::internal_error;
|
use meilisearch_types::internal_error;
|
||||||
|
use milli::UserError;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, IndexError>;
|
pub type Result<T> = std::result::Result<T, IndexError>;
|
||||||
@ -27,6 +29,17 @@ internal_error!(
|
|||||||
milli::documents::Error
|
milli::documents::Error
|
||||||
);
|
);
|
||||||
|
|
||||||
|
impl ErrorCode for IndexError {
|
||||||
|
fn error_code(&self) -> Code {
|
||||||
|
match self {
|
||||||
|
IndexError::Internal(_) => Code::Internal,
|
||||||
|
IndexError::DocumentNotFound(_) => Code::DocumentNotFound,
|
||||||
|
IndexError::Facet(e) => e.error_code(),
|
||||||
|
IndexError::Milli(e) => MilliError(e).error_code(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<milli::UserError> for IndexError {
|
impl From<milli::UserError> for IndexError {
|
||||||
fn from(error: milli::UserError) -> IndexError {
|
fn from(error: milli::UserError) -> IndexError {
|
||||||
IndexError::Milli(error.into())
|
IndexError::Milli(error.into())
|
||||||
@ -46,3 +59,53 @@ impl ErrorCode for FacetError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MilliError<'a>(pub &'a milli::Error);
|
||||||
|
|
||||||
|
impl Error for MilliError<'_> {}
|
||||||
|
|
||||||
|
impl fmt::Display for MilliError<'_> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
self.0.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ErrorCode for MilliError<'_> {
|
||||||
|
fn error_code(&self) -> Code {
|
||||||
|
match self.0 {
|
||||||
|
milli::Error::InternalError(_) => Code::Internal,
|
||||||
|
milli::Error::IoError(_) => Code::Internal,
|
||||||
|
milli::Error::UserError(ref error) => {
|
||||||
|
match error {
|
||||||
|
// TODO: wait for spec for new error codes.
|
||||||
|
UserError::SerdeJson(_)
|
||||||
|
| UserError::InvalidLmdbOpenOptions
|
||||||
|
| UserError::DocumentLimitReached
|
||||||
|
| UserError::AccessingSoftDeletedDocument { .. }
|
||||||
|
| UserError::UnknownInternalDocumentId { .. } => Code::Internal,
|
||||||
|
UserError::InvalidStoreFile => Code::InvalidStore,
|
||||||
|
UserError::NoSpaceLeftOnDevice => Code::NoSpaceLeftOnDevice,
|
||||||
|
UserError::MaxDatabaseSizeReached => Code::DatabaseSizeLimitReached,
|
||||||
|
UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded,
|
||||||
|
UserError::InvalidFilter(_) => Code::Filter,
|
||||||
|
UserError::MissingDocumentId { .. } => Code::MissingDocumentId,
|
||||||
|
UserError::InvalidDocumentId { .. } | UserError::TooManyDocumentIds { .. } => {
|
||||||
|
Code::InvalidDocumentId
|
||||||
|
}
|
||||||
|
UserError::MissingPrimaryKey => Code::MissingPrimaryKey,
|
||||||
|
UserError::PrimaryKeyCannotBeChanged(_) => Code::PrimaryKeyAlreadyPresent,
|
||||||
|
UserError::SortRankingRuleMissing => Code::Sort,
|
||||||
|
UserError::InvalidFacetsDistribution { .. } => Code::BadRequest,
|
||||||
|
UserError::InvalidSortableAttribute { .. } => Code::Sort,
|
||||||
|
UserError::CriterionError(_) => Code::InvalidRankingRule,
|
||||||
|
UserError::InvalidGeoField { .. } => Code::InvalidGeoField,
|
||||||
|
UserError::SortError(_) => Code::Sort,
|
||||||
|
UserError::InvalidMinTypoWordLenSetting(_, _) => {
|
||||||
|
Code::InvalidMinWordLengthForTypo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -248,26 +248,20 @@ impl Index {
|
|||||||
limit: usize,
|
limit: usize,
|
||||||
attributes_to_retrieve: Option<Vec<S>>,
|
attributes_to_retrieve: Option<Vec<S>>,
|
||||||
) -> Result<(u64, Vec<Document>)> {
|
) -> Result<(u64, Vec<Document>)> {
|
||||||
let txn = self.read_txn()?;
|
let rtxn = self.read_txn()?;
|
||||||
|
|
||||||
let fields_ids_map = self.fields_ids_map(&txn)?;
|
|
||||||
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
|
||||||
|
|
||||||
let mut documents = Vec::new();
|
let mut documents = Vec::new();
|
||||||
for entry in self.all_documents(&txn)?.skip(offset).take(limit) {
|
for document in self.all_documents(&rtxn)?.skip(offset).take(limit) {
|
||||||
let (_id, obkv) = entry?;
|
|
||||||
let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?;
|
|
||||||
let document = match &attributes_to_retrieve {
|
let document = match &attributes_to_retrieve {
|
||||||
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
|
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
|
||||||
&document,
|
&document?,
|
||||||
attributes_to_retrieve.iter().map(|s| s.as_ref()),
|
attributes_to_retrieve.iter().map(|s| s.as_ref()),
|
||||||
),
|
),
|
||||||
None => document,
|
None => document?,
|
||||||
};
|
};
|
||||||
documents.push(document);
|
documents.push(document);
|
||||||
}
|
}
|
||||||
|
let number_of_documents = self.number_of_documents(&rtxn)?;
|
||||||
let number_of_documents = self.number_of_documents(&txn)?;
|
|
||||||
|
|
||||||
Ok((number_of_documents, documents))
|
Ok((number_of_documents, documents))
|
||||||
}
|
}
|
||||||
@ -306,6 +300,21 @@ impl Index {
|
|||||||
Ok(document)
|
Ok(document)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn all_documents<'a>(
|
||||||
|
&self,
|
||||||
|
rtxn: &'a RoTxn,
|
||||||
|
) -> Result<impl Iterator<Item = Result<Document>> + 'a> {
|
||||||
|
let fields_ids_map = self.fields_ids_map(&rtxn)?;
|
||||||
|
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||||
|
|
||||||
|
Ok(self.inner.all_documents(&rtxn)?.map(move |ret| {
|
||||||
|
ret.map_err(IndexError::from)
|
||||||
|
.and_then(|(_key, document)| -> Result<_> {
|
||||||
|
Ok(obkv_to_json(&all_fields, &fields_ids_map, document)?)
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn size(&self) -> Result<u64> {
|
pub fn size(&self) -> Result<u64> {
|
||||||
Ok(self.inner.on_disk_size()?)
|
Ok(self.inner.on_disk_size()?)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user