refactor(http, update): introduce setting enum

This commit is contained in:
Alexey Shekhirin 2021-04-07 14:33:44 +03:00
parent 2bcdd8844c
commit dc636d190d
No known key found for this signature in database
GPG key ID: AF9A26AA133B5B98
4 changed files with 160 additions and 148 deletions

View file

@ -1,38 +1,38 @@
use std::{io, mem};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Display;
use std::fs::{File, create_dir_all};
use std::fs::{create_dir_all, File};
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use std::{mem, io};
use askama_warp::Template;
use byte_unit::Byte;
use either::Either;
use flate2::read::GzDecoder;
use futures::stream;
use futures::{FutureExt, StreamExt};
use futures::stream;
use grenad::CompressionType;
use heed::EnvOpenOptions;
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use once_cell::sync::OnceCell;
use rayon::ThreadPool;
use serde::{Serialize, Deserialize, Deserializer};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use structopt::StructOpt;
use tokio::fs::File as TFile;
use tokio::io::AsyncWriteExt;
use tokio::sync::broadcast;
use warp::filters::ws::Message;
use warp::{Filter, http::Response};
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use warp::filters::ws::Message;
use milli::{FacetCondition, Index, MatchingWords, obkv_to_json, SearchResult, UpdateStore};
use milli::facet::FacetValue;
use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder, UpdateFormat};
use milli::update::UpdateIndexingStep::*;
use milli::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat};
use milli::{obkv_to_json, Index, UpdateStore, SearchResult, MatchingWords, FacetCondition};
static GLOBAL_THREAD_POOL: OnceCell<ThreadPool> = OnceCell::new();
@ -154,17 +154,17 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
}
}
Value::String(string)
},
}
Value::Array(values) => {
Value::Array(values.into_iter()
.map(|v| self.highlight_value(v, matching_words))
.collect())
},
}
Value::Object(object) => {
Value::Object(object.into_iter()
.map(|(k, v)| (k, self.highlight_value(v, matching_words)))
.collect())
},
}
}
}
@ -246,36 +246,20 @@ enum UpdateMetaProgress {
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
struct Settings {
#[serde(
default,
deserialize_with = "deserialize_some",
skip_serializing_if = "Option::is_none",
)]
displayed_attributes: Option<Option<Vec<String>>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
displayed_attributes: Setting<Vec<String>>,
#[serde(
default,
deserialize_with = "deserialize_some",
skip_serializing_if = "Option::is_none",
)]
searchable_attributes: Option<Option<Vec<String>>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
searchable_attributes: Setting<Vec<String>>,
#[serde(default)]
faceted_attributes: Option<HashMap<String, String>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
faceted_attributes: Setting<HashMap<String, String>>,
#[serde(
default,
deserialize_with = "deserialize_some",
skip_serializing_if = "Option::is_none",
)]
criteria: Option<Option<Vec<String>>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
criteria: Setting<Vec<String>>,
#[serde(
default,
deserialize_with = "deserialize_some",
skip_serializing_if = "Option::is_none",
)]
stop_words: Option<Option<BTreeSet<String>>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
stop_words: Setting<BTreeSet<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -294,14 +278,6 @@ struct WordsPrefixes {
max_prefix_length: Option<usize>,
}
// Any value that is present is considered Some value, including null.
fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
where T: Deserialize<'de>,
D: Deserializer<'de>
{
Deserialize::deserialize(deserializer).map(Some)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
@ -339,7 +315,7 @@ async fn main() -> anyhow::Result<()> {
update_store_options,
update_store_path,
// the type hint is necessary: https://github.com/rust-lang/rust/issues/32600
move |update_id, meta, content:&_| {
move |update_id, meta, content: &_| {
// We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new(update_id);
if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks {
@ -396,7 +372,7 @@ async fn main() -> anyhow::Result<()> {
total_steps: indexing_step.number_of_steps(),
current,
total,
}
},
});
});
@ -404,7 +380,7 @@ async fn main() -> anyhow::Result<()> {
Ok(_) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into())
}
},
}
UpdateMeta::ClearDocuments => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
@ -414,47 +390,45 @@ async fn main() -> anyhow::Result<()> {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into())
}
},
}
UpdateMeta::Settings(settings) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.settings(&mut wtxn, &index_cloned);
// We transpose the settings JSON struct into a real setting update.
if let Some(names) = settings.searchable_attributes {
match names {
Some(names) => builder.set_searchable_fields(names),
None => builder.reset_searchable_fields(),
}
match settings.searchable_attributes {
Setting::Set(searchable_attributes) => builder.set_searchable_fields(searchable_attributes),
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => ()
}
// We transpose the settings JSON struct into a real setting update.
if let Some(names) = settings.displayed_attributes {
match names {
Some(names) => builder.set_displayed_fields(names),
None => builder.reset_displayed_fields(),
}
match settings.displayed_attributes {
Setting::Set(displayed_attributes) => builder.set_displayed_fields(displayed_attributes),
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => ()
}
// We transpose the settings JSON struct into a real setting update.
if let Some(facet_types) = settings.faceted_attributes {
builder.set_faceted_fields(facet_types);
match settings.faceted_attributes {
Setting::Set(faceted_attributes) => builder.set_faceted_fields(faceted_attributes),
Setting::Reset => builder.reset_faceted_fields(),
Setting::NotSet => ()
}
// We transpose the settings JSON struct into a real setting update.
if let Some(criteria) = settings.criteria {
match criteria {
Some(criteria) => builder.set_criteria(criteria),
None => builder.reset_criteria(),
}
match settings.criteria {
Setting::Set(criteria) => builder.set_criteria(criteria),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => ()
}
// We transpose the settings JSON struct into a real setting update.
if let Some(stop_words) = settings.stop_words {
match stop_words {
Some(stop_words) => builder.set_stop_words(stop_words),
None => builder.reset_stop_words(),
}
match settings.stop_words {
Setting::Set(stop_words) => builder.set_stop_words(stop_words),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => ()
}
let result = builder.execute(|indexing_step, update_id| {
@ -471,7 +445,7 @@ async fn main() -> anyhow::Result<()> {
total_steps: indexing_step.number_of_steps(),
current,
total,
}
},
});
});
@ -479,7 +453,7 @@ async fn main() -> anyhow::Result<()> {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into())
}
},
}
UpdateMeta::Facets(levels) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
@ -494,7 +468,7 @@ async fn main() -> anyhow::Result<()> {
Ok(()) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into())
}
},
}
UpdateMeta::WordsPrefixes(settings) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
@ -716,7 +690,7 @@ async fn main() -> anyhow::Result<()> {
let filters = match query.filters {
Some(condition) if !condition.trim().is_empty() => {
Some(FacetCondition::from_str(&rtxn, &index, &condition).unwrap())
},
}
_otherwise => None,
};
@ -724,14 +698,14 @@ async fn main() -> anyhow::Result<()> {
Some(array) => {
let eithers = array.into_iter().map(Into::into);
FacetCondition::from_array(&rtxn, &index, eithers).unwrap()
},
}
_otherwise => None,
};
let condition = match (filters, facet_filters) {
(Some(filters), Some(facet_filters)) => {
Some(FacetCondition::And(Box::new(filters), Box::new(facet_filters)))
},
}
(Some(condition), None) | (None, Some(condition)) => Some(condition),
_otherwise => None,
};
@ -807,12 +781,12 @@ async fn main() -> anyhow::Result<()> {
Response::builder()
.header("Content-Type", "application/json")
.body(serde_json::to_string(&document).unwrap())
},
}
None => {
Response::builder()
.status(404)
.body(format!("Document with id {:?} not found.", id))
},
}
}
});
@ -978,11 +952,11 @@ async fn main() -> anyhow::Result<()> {
Ok(status) => {
let msg = serde_json::to_string(&status).unwrap();
stream::iter(Some(Ok(Message::text(msg))))
},
}
Err(e) => {
eprintln!("channel error: {:?}", e);
stream::iter(None)
},
}
}
})
.forward(websocket)