mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-05 12:38:55 +01:00
documetn addition and search
This commit is contained in:
parent
74410d8c6b
commit
8183202868
@ -61,7 +61,12 @@ impl Data {
|
|||||||
let path = options.db_path.clone();
|
let path = options.db_path.clone();
|
||||||
let indexer_opts = options.indexer_options.clone();
|
let indexer_opts = options.indexer_options.clone();
|
||||||
create_dir_all(&path)?;
|
create_dir_all(&path)?;
|
||||||
let index_controller = LocalIndexController::new(&path, indexer_opts)?;
|
let index_controller = LocalIndexController::new(
|
||||||
|
&path,
|
||||||
|
indexer_opts,
|
||||||
|
options.max_mdb_size.get_bytes(),
|
||||||
|
options.max_udb_size.get_bytes(),
|
||||||
|
)?;
|
||||||
let indexes = Arc::new(index_controller);
|
let indexes = Arc::new(index_controller);
|
||||||
|
|
||||||
let mut api_keys = ApiKeys {
|
let mut api_keys = ApiKeys {
|
||||||
|
@ -6,7 +6,8 @@ use futures_util::stream::StreamExt;
|
|||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
use super::Data;
|
use super::Data;
|
||||||
use crate::index_controller::{IndexController, UpdateStatusResponse, Settings};
|
use crate::index_controller::{IndexController, Settings, UpdateResult, UpdateMeta};
|
||||||
|
use crate::index_controller::updates::UpdateStatus;
|
||||||
|
|
||||||
impl Data {
|
impl Data {
|
||||||
pub async fn add_documents<B, E, S>(
|
pub async fn add_documents<B, E, S>(
|
||||||
@ -15,7 +16,7 @@ impl Data {
|
|||||||
method: IndexDocumentsMethod,
|
method: IndexDocumentsMethod,
|
||||||
format: UpdateFormat,
|
format: UpdateFormat,
|
||||||
mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin,
|
mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin,
|
||||||
) -> anyhow::Result<UpdateStatusResponse>
|
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>>
|
||||||
where
|
where
|
||||||
B: Deref<Target = [u8]>,
|
B: Deref<Target = [u8]>,
|
||||||
E: std::error::Error + Send + Sync + 'static,
|
E: std::error::Error + Send + Sync + 'static,
|
||||||
@ -45,7 +46,7 @@ impl Data {
|
|||||||
&self,
|
&self,
|
||||||
index: S,
|
index: S,
|
||||||
settings: Settings
|
settings: Settings
|
||||||
) -> anyhow::Result<UpdateStatusResponse> {
|
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
|
||||||
let indexes = self.index_controller.clone();
|
let indexes = self.index_controller.clone();
|
||||||
let update = tokio::task::spawn_blocking(move || indexes.update_settings(index, settings)).await??;
|
let update = tokio::task::spawn_blocking(move || indexes.update_settings(index, settings)).await??;
|
||||||
Ok(update.into())
|
Ok(update.into())
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::fs::create_dir_all;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
@ -8,6 +9,7 @@ use milli::Index;
|
|||||||
use rayon::ThreadPool;
|
use rayon::ThreadPool;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
|
use log::warn;
|
||||||
|
|
||||||
use super::update_store::UpdateStore;
|
use super::update_store::UpdateStore;
|
||||||
use super::update_handler::UpdateHandler;
|
use super::update_handler::UpdateHandler;
|
||||||
@ -15,8 +17,8 @@ use crate::option::IndexerOpts;
|
|||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
struct IndexMeta {
|
struct IndexMeta {
|
||||||
update_size: usize,
|
update_size: u64,
|
||||||
index_size: usize,
|
index_size: u64,
|
||||||
uid: Uuid,
|
uid: Uuid,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -30,12 +32,15 @@ impl IndexMeta {
|
|||||||
let update_path = make_update_db_path(&path, &self.uid);
|
let update_path = make_update_db_path(&path, &self.uid);
|
||||||
let index_path = make_index_db_path(&path, &self.uid);
|
let index_path = make_index_db_path(&path, &self.uid);
|
||||||
|
|
||||||
|
create_dir_all(&update_path)?;
|
||||||
|
create_dir_all(&index_path)?;
|
||||||
|
|
||||||
let mut options = EnvOpenOptions::new();
|
let mut options = EnvOpenOptions::new();
|
||||||
options.map_size(self.index_size);
|
options.map_size(self.index_size as usize);
|
||||||
let index = Arc::new(Index::new(options, index_path)?);
|
let index = Arc::new(Index::new(options, index_path)?);
|
||||||
|
|
||||||
let mut options = EnvOpenOptions::new();
|
let mut options = EnvOpenOptions::new();
|
||||||
options.map_size(self.update_size);
|
options.map_size(self.update_size as usize);
|
||||||
let handler = UpdateHandler::new(opt, index.clone(), thread_pool)?;
|
let handler = UpdateHandler::new(opt, index.clone(), thread_pool)?;
|
||||||
let update_store = UpdateStore::open(options, update_path, handler)?;
|
let update_store = UpdateStore::open(options, update_path, handler)?;
|
||||||
Ok((index, update_store))
|
Ok((index, update_store))
|
||||||
@ -132,8 +137,8 @@ impl IndexStore {
|
|||||||
|
|
||||||
pub fn get_or_create_index(
|
pub fn get_or_create_index(
|
||||||
&self, name: impl AsRef<str>,
|
&self, name: impl AsRef<str>,
|
||||||
update_size: usize,
|
update_size: u64,
|
||||||
index_size: usize,
|
index_size: u64,
|
||||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
||||||
let mut txn = self.env.write_txn()?;
|
let mut txn = self.env.write_txn()?;
|
||||||
match self._get_index(&txn, name.as_ref())? {
|
match self._get_index(&txn, name.as_ref())? {
|
||||||
@ -141,17 +146,39 @@ impl IndexStore {
|
|||||||
None => {
|
None => {
|
||||||
let uid = Uuid::new_v4();
|
let uid = Uuid::new_v4();
|
||||||
// TODO: clean in case of error
|
// TODO: clean in case of error
|
||||||
Ok(self.create_index(&mut txn, uid, name, update_size, index_size)?)
|
let result = self.create_index(&mut txn, uid, name, update_size, index_size);
|
||||||
|
match result {
|
||||||
|
Ok((index, update_store)) => {
|
||||||
|
match txn.commit() {
|
||||||
|
Ok(_) => Ok((index, update_store)),
|
||||||
|
Err(e) => {
|
||||||
|
self.clean_uid(&uid);
|
||||||
|
Err(anyhow::anyhow!("error creating index: {}", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
self.clean_uid(&uid);
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// removes all data acociated with an index Uuid. This is called when index creation failed
|
||||||
|
/// and outstanding files and data need to be cleaned.
|
||||||
|
fn clean_uid(&self, _uid: &Uuid) {
|
||||||
|
// TODO!
|
||||||
|
warn!("creating cleanup is not yet implemented");
|
||||||
|
}
|
||||||
|
|
||||||
fn create_index( &self,
|
fn create_index( &self,
|
||||||
txn: &mut RwTxn,
|
txn: &mut RwTxn,
|
||||||
uid: Uuid,
|
uid: Uuid,
|
||||||
name: impl AsRef<str>,
|
name: impl AsRef<str>,
|
||||||
update_size: usize,
|
update_size: u64,
|
||||||
index_size: usize,
|
index_size: u64,
|
||||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
||||||
let meta = IndexMeta { update_size, index_size, uid: uid.clone() };
|
let meta = IndexMeta { update_size, index_size, uid: uid.clone() };
|
||||||
|
|
||||||
|
@ -11,30 +11,46 @@ use milli::Index;
|
|||||||
|
|
||||||
use crate::option::IndexerOpts;
|
use crate::option::IndexerOpts;
|
||||||
use super::IndexController;
|
use super::IndexController;
|
||||||
|
use super::updates::UpdateStatus;
|
||||||
|
use super::{UpdateMeta, UpdateResult};
|
||||||
|
|
||||||
pub struct LocalIndexController {
|
pub struct LocalIndexController {
|
||||||
indexes: IndexStore,
|
indexes: IndexStore,
|
||||||
|
update_db_size: u64,
|
||||||
|
index_db_size: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LocalIndexController {
|
impl LocalIndexController {
|
||||||
pub fn new(path: impl AsRef<Path>, opt: IndexerOpts) -> anyhow::Result<Self> {
|
pub fn new(
|
||||||
|
path: impl AsRef<Path>,
|
||||||
|
opt: IndexerOpts,
|
||||||
|
index_db_size: u64,
|
||||||
|
update_db_size: u64,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
let indexes = IndexStore::new(path, opt)?;
|
let indexes = IndexStore::new(path, opt)?;
|
||||||
Ok(Self { indexes })
|
Ok(Self { indexes, index_db_size, update_db_size })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IndexController for LocalIndexController {
|
impl IndexController for LocalIndexController {
|
||||||
fn add_documents<S: AsRef<str>>(
|
fn add_documents<S: AsRef<str>>(
|
||||||
&self,
|
&self,
|
||||||
_index: S,
|
index: S,
|
||||||
_method: milli::update::IndexDocumentsMethod,
|
method: milli::update::IndexDocumentsMethod,
|
||||||
_format: milli::update::UpdateFormat,
|
format: milli::update::UpdateFormat,
|
||||||
_data: &[u8],
|
data: &[u8],
|
||||||
) -> anyhow::Result<super::UpdateStatusResponse> {
|
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
|
||||||
todo!()
|
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
|
||||||
|
let meta = UpdateMeta::DocumentsAddition { method, format };
|
||||||
|
let pending = update_store.register_update(meta, data).unwrap();
|
||||||
|
Ok(pending.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_settings<S: AsRef<str>>(&self, _index_uid: S, _settings: super::Settings) -> anyhow::Result<super::UpdateStatusResponse> {
|
fn update_settings<S: AsRef<str>>(
|
||||||
|
&self,
|
||||||
|
_index_uid: S,
|
||||||
|
_settings: super::Settings
|
||||||
|
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,6 +188,8 @@ impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
|
|||||||
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
|
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
|
||||||
use UpdateMeta::*;
|
use UpdateMeta::*;
|
||||||
|
|
||||||
|
println!("handling update {}", update_id);
|
||||||
|
|
||||||
let update_builder = self.update_buidler(update_id);
|
let update_builder = self.update_buidler(update_id);
|
||||||
|
|
||||||
let result = match meta.meta() {
|
let result = match meta.meta() {
|
||||||
@ -197,6 +199,8 @@ impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
|
|||||||
Facets(levels) => self.update_facets(levels, update_builder),
|
Facets(levels) => self.update_facets(levels, update_builder),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
println!("{:?}", result);
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(result) => Ok(meta.process(result)),
|
Ok(result) => Ok(meta.process(result)),
|
||||||
Err(e) => Err(meta.fail(e.to_string())),
|
Err(e) => Err(meta.fail(e.to_string())),
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
mod local_index_controller;
|
mod local_index_controller;
|
||||||
mod updates;
|
pub mod updates;
|
||||||
|
|
||||||
pub use local_index_controller::LocalIndexController;
|
pub use local_index_controller::LocalIndexController;
|
||||||
|
|
||||||
@ -12,9 +12,8 @@ use milli::Index;
|
|||||||
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
|
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
|
||||||
use serde::{Serialize, Deserialize, de::Deserializer};
|
use serde::{Serialize, Deserialize, de::Deserializer};
|
||||||
|
|
||||||
use updates::{Processed, Processing, Failed, Pending, Aborted};
|
use updates::{Processed, Processing, Failed, UpdateStatus};
|
||||||
|
|
||||||
pub type UpdateStatusResponse = UpdateStatus<UpdateMeta, UpdateResult, String>;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(tag = "type")]
|
#[serde(tag = "type")]
|
||||||
@ -33,15 +32,6 @@ pub struct Facets {
|
|||||||
pub min_level_size: Option<NonZeroUsize>,
|
pub min_level_size: Option<NonZeroUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
#[serde(tag = "type")]
|
|
||||||
pub enum UpdateStatus<M, P, N> {
|
|
||||||
Pending { update_id: u64, meta: Pending<M> },
|
|
||||||
Progressing { update_id: u64, meta: P },
|
|
||||||
Processed { update_id: u64, meta: Processed<M, N> },
|
|
||||||
Aborted { update_id: u64, meta: Aborted<M> },
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
|
fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
|
||||||
where T: Deserialize<'de>,
|
where T: Deserialize<'de>,
|
||||||
D: Deserializer<'de>
|
D: Deserializer<'de>
|
||||||
@ -116,11 +106,11 @@ pub trait IndexController {
|
|||||||
method: IndexDocumentsMethod,
|
method: IndexDocumentsMethod,
|
||||||
format: UpdateFormat,
|
format: UpdateFormat,
|
||||||
data: &[u8],
|
data: &[u8],
|
||||||
) -> anyhow::Result<UpdateStatusResponse>;
|
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>>;
|
||||||
|
|
||||||
/// Updates an index settings. If the index does not exist, it will be created when the update
|
/// Updates an index settings. If the index does not exist, it will be created when the update
|
||||||
/// is applied to the index.
|
/// is applied to the index.
|
||||||
fn update_settings<S: AsRef<str>>(&self, index_uid: S, settings: Settings) -> anyhow::Result<UpdateStatusResponse>;
|
fn update_settings<S: AsRef<str>>(&self, index_uid: S, settings: Settings) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>>;
|
||||||
|
|
||||||
/// Create an index with the given `index_uid`.
|
/// Create an index with the given `index_uid`.
|
||||||
fn create_index<S: AsRef<str>>(&self, index_uid: S) -> Result<()>;
|
fn create_index<S: AsRef<str>>(&self, index_uid: S) -> Result<()>;
|
||||||
|
Loading…
Reference in New Issue
Block a user