mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
refactor update handler
This commit is contained in:
parent
9e2a95b1a3
commit
a56db854a2
11 changed files with 522 additions and 246 deletions
|
@ -13,7 +13,8 @@ use tokio::sync::{mpsc, oneshot, RwLock};
|
|||
use uuid::Uuid;
|
||||
|
||||
use super::update_handler::UpdateHandler;
|
||||
use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}, UpdateResult as UResult};
|
||||
use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}};
|
||||
use crate::index::UpdateResult as UResult;
|
||||
use crate::option::IndexerOpts;
|
||||
use crate::index::{Index, SearchQuery, SearchResult};
|
||||
|
||||
|
|
|
@ -15,6 +15,9 @@ use super::UpdateMeta;
|
|||
use crate::index::{SearchResult, SearchQuery};
|
||||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::index::Settings;
|
||||
use super::UpdateStatus;
|
||||
|
||||
pub struct IndexController {
|
||||
uuid_resolver: uuid_resolver::UuidResolverHandle,
|
||||
index_handle: index_actor::IndexActorHandle,
|
||||
|
@ -69,7 +72,7 @@ impl IndexController {
|
|||
Ok(status)
|
||||
}
|
||||
|
||||
fn clear_documents(&self, index: String) -> anyhow::Result<super::UpdateStatus> {
|
||||
fn clear_documents(&self, index: String) -> anyhow::Result<UpdateStatus> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
|
@ -77,7 +80,7 @@ impl IndexController {
|
|||
todo!()
|
||||
}
|
||||
|
||||
fn update_settings(&self, index_uid: String, settings: super::Settings) -> anyhow::Result<super::UpdateStatus> {
|
||||
fn update_settings(&self, index_uid: String, settings: Settings) -> anyhow::Result<super::UpdateStatus> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
|
@ -100,7 +103,7 @@ impl IndexController {
|
|||
todo!()
|
||||
}
|
||||
|
||||
fn update_status(&self, index: String, id: u64) -> anyhow::Result<Option<super::UpdateStatus>> {
|
||||
fn update_status(&self, index: String, id: u64) -> anyhow::Result<Option<UpdateStatus>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
|
|
|
@ -10,7 +10,8 @@ use uuid::Uuid;
|
|||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult};
|
||||
use crate::index_controller::{UpdateMeta, UpdateStatus};
|
||||
use crate::index::UpdateResult;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, UpdateError>;
|
||||
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
|
||||
|
|
|
@ -1,17 +1,14 @@
|
|||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::fs::File;
|
||||
|
||||
use anyhow::Result;
|
||||
use flate2::read::GzDecoder;
|
||||
use grenad::CompressionType;
|
||||
use log::info;
|
||||
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
|
||||
use milli::update::UpdateBuilder;
|
||||
use crate::index::Index;
|
||||
use rayon::ThreadPool;
|
||||
|
||||
use crate::index_controller::updates::{Failed, Processed, Processing};
|
||||
use crate::index_controller::{Facets, Settings, UpdateMeta, UpdateResult};
|
||||
use crate::index_controller::UpdateMeta;
|
||||
use crate::index::UpdateResult;
|
||||
use crate::option::IndexerOpts;
|
||||
|
||||
pub struct UpdateHandler {
|
||||
|
@ -62,164 +59,6 @@ impl UpdateHandler {
|
|||
update_builder
|
||||
}
|
||||
|
||||
fn update_documents(
|
||||
&self,
|
||||
format: UpdateFormat,
|
||||
method: IndexDocumentsMethod,
|
||||
content: File,
|
||||
update_builder: UpdateBuilder,
|
||||
primary_key: Option<&str>,
|
||||
index: &Index,
|
||||
) -> anyhow::Result<UpdateResult> {
|
||||
info!("performing document addition");
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = index.write_txn()?;
|
||||
|
||||
// Set the primary key if not set already, ignore if already set.
|
||||
match (index.primary_key(&wtxn)?, primary_key) {
|
||||
(None, Some(ref primary_key)) => {
|
||||
index.put_primary_key(&mut wtxn, primary_key)?;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
let mut builder = update_builder.index_documents(&mut wtxn, index);
|
||||
builder.update_format(format);
|
||||
builder.index_documents_method(method);
|
||||
|
||||
let gzipped = false;
|
||||
let reader = if gzipped {
|
||||
Box::new(GzDecoder::new(content))
|
||||
} else {
|
||||
Box::new(content) as Box<dyn io::Read>
|
||||
};
|
||||
|
||||
let result = builder.execute(reader, |indexing_step, update_id| {
|
||||
info!("update {}: {:?}", update_id, indexing_step)
|
||||
});
|
||||
|
||||
info!("document addition done: {:?}", result);
|
||||
|
||||
match result {
|
||||
Ok(addition_result) => wtxn
|
||||
.commit()
|
||||
.and(Ok(UpdateResult::DocumentsAddition(addition_result)))
|
||||
.map_err(Into::into),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_documents(&self, update_builder: UpdateBuilder, index: &Index) -> anyhow::Result<UpdateResult> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = index.write_txn()?;
|
||||
let builder = update_builder.clear_documents(&mut wtxn, index);
|
||||
|
||||
match builder.execute() {
|
||||
Ok(_count) => wtxn
|
||||
.commit()
|
||||
.and(Ok(UpdateResult::Other))
|
||||
.map_err(Into::into),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_settings(
|
||||
&self,
|
||||
settings: &Settings,
|
||||
update_builder: UpdateBuilder,
|
||||
index: &Index,
|
||||
) -> anyhow::Result<UpdateResult> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = index.write_txn()?;
|
||||
let mut builder = update_builder.settings(&mut wtxn, index);
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(ref names) = settings.searchable_attributes {
|
||||
match names {
|
||||
Some(names) => builder.set_searchable_fields(names.clone()),
|
||||
None => builder.reset_searchable_fields(),
|
||||
}
|
||||
}
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(ref names) = settings.displayed_attributes {
|
||||
match names {
|
||||
Some(names) => builder.set_displayed_fields(names.clone()),
|
||||
None => builder.reset_displayed_fields(),
|
||||
}
|
||||
}
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(ref facet_types) = settings.faceted_attributes {
|
||||
let facet_types = facet_types.clone().unwrap_or_else(|| HashMap::new());
|
||||
builder.set_faceted_fields(facet_types);
|
||||
}
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(ref criteria) = settings.criteria {
|
||||
match criteria {
|
||||
Some(criteria) => builder.set_criteria(criteria.clone()),
|
||||
None => builder.reset_criteria(),
|
||||
}
|
||||
}
|
||||
|
||||
let result = builder
|
||||
.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
|
||||
|
||||
match result {
|
||||
Ok(()) => wtxn
|
||||
.commit()
|
||||
.and(Ok(UpdateResult::Other))
|
||||
.map_err(Into::into),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_facets(
|
||||
&self,
|
||||
levels: &Facets,
|
||||
update_builder: UpdateBuilder,
|
||||
index: &Index,
|
||||
) -> anyhow::Result<UpdateResult> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = index.write_txn()?;
|
||||
let mut builder = update_builder.facets(&mut wtxn, index);
|
||||
if let Some(value) = levels.level_group_size {
|
||||
builder.level_group_size(value);
|
||||
}
|
||||
if let Some(value) = levels.min_level_size {
|
||||
builder.min_level_size(value);
|
||||
}
|
||||
match builder.execute() {
|
||||
Ok(()) => wtxn
|
||||
.commit()
|
||||
.and(Ok(UpdateResult::Other))
|
||||
.map_err(Into::into),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn delete_documents(
|
||||
&self,
|
||||
document_ids: File,
|
||||
update_builder: UpdateBuilder,
|
||||
index: &Index,
|
||||
) -> anyhow::Result<UpdateResult> {
|
||||
let ids: Vec<String> = serde_json::from_reader(document_ids)?;
|
||||
let mut txn = index.write_txn()?;
|
||||
let mut builder = update_builder.delete_documents(&mut txn, index)?;
|
||||
|
||||
// We ignore unexisting document ids
|
||||
ids.iter().for_each(|id| { builder.delete_external_id(id); });
|
||||
|
||||
match builder.execute() {
|
||||
Ok(deleted) => txn
|
||||
.commit()
|
||||
.and(Ok(UpdateResult::DocumentDeletion { deleted }))
|
||||
.map_err(Into::into),
|
||||
Err(e) => Err(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_update(
|
||||
&self,
|
||||
|
@ -238,18 +77,17 @@ impl UpdateHandler {
|
|||
method,
|
||||
format,
|
||||
primary_key,
|
||||
} => self.update_documents(
|
||||
} => index.update_documents(
|
||||
*format,
|
||||
*method,
|
||||
content,
|
||||
update_builder,
|
||||
primary_key.as_deref(),
|
||||
&index,
|
||||
),
|
||||
ClearDocuments => self.clear_documents(update_builder, &index),
|
||||
DeleteDocuments => self.delete_documents(content, update_builder, &index),
|
||||
Settings(settings) => self.update_settings(settings, update_builder, &index),
|
||||
Facets(levels) => self.update_facets(levels, update_builder, &index),
|
||||
ClearDocuments => index.clear_documents(update_builder),
|
||||
DeleteDocuments => index.delete_documents(content, update_builder),
|
||||
Settings(settings) => index.update_settings(settings, update_builder),
|
||||
Facets(levels) => index.update_facets(levels, update_builder),
|
||||
};
|
||||
|
||||
match result {
|
||||
|
|
|
@ -1,16 +1,13 @@
|
|||
pub mod actor_index_controller;
|
||||
mod updates;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use anyhow::Result;
|
||||
use chrono::{DateTime, Utc};
|
||||
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
|
||||
use serde::{Serialize, Deserialize, de::Deserializer};
|
||||
use milli::update::{IndexDocumentsMethod, UpdateFormat};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub use updates::{Processed, Processing, Failed};
|
||||
use crate::index::{UpdateResult, Settings, Facets};
|
||||
|
||||
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
|
||||
|
||||
|
@ -37,66 +34,7 @@ pub enum UpdateMeta {
|
|||
Facets(Facets),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Facets {
|
||||
pub level_group_size: Option<NonZeroUsize>,
|
||||
pub min_level_size: Option<NonZeroUsize>,
|
||||
}
|
||||
|
||||
fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
|
||||
where T: Deserialize<'de>,
|
||||
D: Deserializer<'de>
|
||||
{
|
||||
Deserialize::deserialize(deserializer).map(Some)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Settings {
|
||||
#[serde(
|
||||
default,
|
||||
deserialize_with = "deserialize_some",
|
||||
skip_serializing_if = "Option::is_none",
|
||||
)]
|
||||
pub displayed_attributes: Option<Option<Vec<String>>>,
|
||||
|
||||
#[serde(
|
||||
default,
|
||||
deserialize_with = "deserialize_some",
|
||||
skip_serializing_if = "Option::is_none",
|
||||
)]
|
||||
pub searchable_attributes: Option<Option<Vec<String>>>,
|
||||
|
||||
#[serde(default)]
|
||||
pub faceted_attributes: Option<Option<HashMap<String, String>>>,
|
||||
|
||||
#[serde(
|
||||
default,
|
||||
deserialize_with = "deserialize_some",
|
||||
skip_serializing_if = "Option::is_none",
|
||||
)]
|
||||
pub criteria: Option<Option<Vec<String>>>,
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
pub fn cleared() -> Self {
|
||||
Self {
|
||||
displayed_attributes: Some(None),
|
||||
searchable_attributes: Some(None),
|
||||
faceted_attributes: Some(None),
|
||||
criteria: Some(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum UpdateResult {
|
||||
DocumentsAddition(DocumentAdditionResult),
|
||||
DocumentDeletion { deleted: usize },
|
||||
Other,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct IndexSettings {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue