format the whole project

This commit is contained in:
Tamo 2021-06-16 18:33:33 +02:00
parent ba30cef987
commit 9716fb3b36
No known key found for this signature in database
GPG key ID: 20CD8020AFA88D69
68 changed files with 3327 additions and 2336 deletions

View file

@ -1,6 +1,5 @@
mod update_store;
use std::{io, mem};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Display;
use std::fs::{create_dir_all, File};
@ -10,16 +9,19 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use std::{io, mem};
use askama_warp::Template;
use byte_unit::Byte;
use either::Either;
use flate2::read::GzDecoder;
use futures::{FutureExt, StreamExt};
use futures::stream;
use futures::{stream, FutureExt, StreamExt};
use grenad::CompressionType;
use heed::EnvOpenOptions;
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use milli::update::UpdateIndexingStep::*;
use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder, UpdateFormat};
use milli::{obkv_to_json, FilterCondition, Index, MatchingWords, SearchResult};
use once_cell::sync::OnceCell;
use rayon::ThreadPool;
use serde::{Deserialize, Serialize};
@ -28,12 +30,9 @@ use structopt::StructOpt;
use tokio::fs::File as TFile;
use tokio::io::AsyncWriteExt;
use tokio::sync::broadcast;
use warp::{Filter, http::Response};
use warp::filters::ws::Message;
use milli::{FilterCondition, Index, MatchingWords, obkv_to_json, SearchResult};
use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder, UpdateFormat};
use milli::update::UpdateIndexingStep::*;
use warp::http::Response;
use warp::Filter;
use self::update_store::UpdateStore;
@ -149,25 +148,28 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
for (word, token) in analyzed.reconstruct() {
if token.is_word() {
let to_highlight = matching_words.matching_bytes(token.text()).is_some();
if to_highlight { string.push_str("<mark>") }
if to_highlight {
string.push_str("<mark>")
}
string.push_str(word);
if to_highlight { string.push_str("</mark>") }
if to_highlight {
string.push_str("</mark>")
}
} else {
string.push_str(word);
}
}
Value::String(string)
}
Value::Array(values) => {
Value::Array(values.into_iter()
.map(|v| self.highlight_value(v, matching_words))
.collect())
}
Value::Object(object) => {
Value::Object(object.into_iter()
Value::Array(values) => Value::Array(
values.into_iter().map(|v| self.highlight_value(v, matching_words)).collect(),
),
Value::Object(object) => Value::Object(
object
.into_iter()
.map(|(k, v)| (k, self.highlight_value(v, matching_words)))
.collect())
}
.collect(),
),
}
}
@ -236,12 +238,7 @@ enum UpdateMeta {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum UpdateMetaProgress {
DocumentsAddition {
step: usize,
total_steps: usize,
current: usize,
total: Option<usize>,
},
DocumentsAddition { step: usize, total_steps: usize, current: usize, total: Option<usize> },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@ -342,157 +339,185 @@ async fn main() -> anyhow::Result<()> {
update_builder.max_memory(indexer_opt_cloned.max_memory.get_bytes() as usize);
update_builder.linked_hash_map_size(indexer_opt_cloned.linked_hash_map_size);
update_builder.chunk_compression_type(indexer_opt_cloned.chunk_compression_type);
update_builder.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size.get_bytes());
update_builder
.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size.get_bytes());
let before_update = Instant::now();
// we extract the update type and execute the update itself.
let result: anyhow::Result<()> = match meta {
UpdateMeta::DocumentsAddition { method, format, encoding } => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
let result: anyhow::Result<()> =
match meta {
UpdateMeta::DocumentsAddition { method, format, encoding } => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
match format.as_str() {
"csv" => builder.update_format(UpdateFormat::Csv),
"json" => builder.update_format(UpdateFormat::Json),
"json-stream" => builder.update_format(UpdateFormat::JsonStream),
otherwise => panic!("invalid update format {:?}", otherwise),
};
match method.as_str() {
"replace" => builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments),
"update" => builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments),
otherwise => panic!("invalid indexing method {:?}", otherwise),
};
let reader = match encoding.as_deref() {
Some("gzip") => Box::new(GzDecoder::new(content)),
None => Box::new(content) as Box<dyn io::Read>,
otherwise => panic!("invalid encoding format {:?}", otherwise),
};
let result = builder.execute(reader, |indexing_step, update_id| {
let (current, total) = match indexing_step {
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)),
match format.as_str() {
"csv" => builder.update_format(UpdateFormat::Csv),
"json" => builder.update_format(UpdateFormat::Json),
"json-stream" => builder.update_format(UpdateFormat::JsonStream),
otherwise => panic!("invalid update format {:?}", otherwise),
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
});
match result {
Ok(_) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
UpdateMeta::ClearDocuments => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let builder = update_builder.clear_documents(&mut wtxn, &index_cloned);
match builder.execute() {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
UpdateMeta::Settings(settings) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.settings(&mut wtxn, &index_cloned);
// We transpose the settings JSON struct into a real setting update.
match settings.searchable_attributes {
Setting::Set(searchable_attributes) => builder.set_searchable_fields(searchable_attributes),
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => ()
}
// We transpose the settings JSON struct into a real setting update.
match settings.displayed_attributes {
Setting::Set(displayed_attributes) => builder.set_displayed_fields(displayed_attributes),
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => ()
}
// We transpose the settings JSON struct into a real setting update.
match settings.filterable_attributes {
Setting::Set(filterable_attributes) => builder.set_filterable_fields(filterable_attributes),
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => ()
}
// We transpose the settings JSON struct into a real setting update.
match settings.criteria {
Setting::Set(criteria) => builder.set_criteria(criteria),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => ()
}
// We transpose the settings JSON struct into a real setting update.
match settings.stop_words {
Setting::Set(stop_words) => builder.set_stop_words(stop_words),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => ()
}
// We transpose the settings JSON struct into a real setting update.
match settings.synonyms {
Setting::Set(synonyms) => builder.set_synonyms(synonyms),
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => ()
}
let result = builder.execute(|indexing_step, update_id| {
let (current, total) = match indexing_step {
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)),
match method.as_str() {
"replace" => builder
.index_documents_method(IndexDocumentsMethod::ReplaceDocuments),
"update" => builder
.index_documents_method(IndexDocumentsMethod::UpdateDocuments),
otherwise => panic!("invalid indexing method {:?}", otherwise),
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
});
match result {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
let reader = match encoding.as_deref() {
Some("gzip") => Box::new(GzDecoder::new(content)),
None => Box::new(content) as Box<dyn io::Read>,
otherwise => panic!("invalid encoding format {:?}", otherwise),
};
let result = builder.execute(reader, |indexing_step, update_id| {
let (current, total) = match indexing_step {
TransformFromUserIntoGenericFormat { documents_seen } => {
(documents_seen, None)
}
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
});
match result {
Ok(_) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
}
UpdateMeta::Facets(levels) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.facets(&mut wtxn, &index_cloned);
if let Some(value) = levels.level_group_size {
builder.level_group_size(value);
UpdateMeta::ClearDocuments => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let builder = update_builder.clear_documents(&mut wtxn, &index_cloned);
match builder.execute() {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
if let Some(value) = levels.min_level_size {
builder.min_level_size(value);
UpdateMeta::Settings(settings) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.settings(&mut wtxn, &index_cloned);
// We transpose the settings JSON struct into a real setting update.
match settings.searchable_attributes {
Setting::Set(searchable_attributes) => {
builder.set_searchable_fields(searchable_attributes)
}
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.displayed_attributes {
Setting::Set(displayed_attributes) => {
builder.set_displayed_fields(displayed_attributes)
}
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.filterable_attributes {
Setting::Set(filterable_attributes) => {
builder.set_filterable_fields(filterable_attributes)
}
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.criteria {
Setting::Set(criteria) => builder.set_criteria(criteria),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.stop_words {
Setting::Set(stop_words) => builder.set_stop_words(stop_words),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.synonyms {
Setting::Set(synonyms) => builder.set_synonyms(synonyms),
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (),
}
let result = builder.execute(|indexing_step, update_id| {
let (current, total) = match indexing_step {
TransformFromUserIntoGenericFormat { documents_seen } => {
(documents_seen, None)
}
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
});
match result {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
match builder.execute() {
Ok(()) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
UpdateMeta::Facets(levels) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.facets(&mut wtxn, &index_cloned);
if let Some(value) = levels.level_group_size {
builder.level_group_size(value);
}
if let Some(value) = levels.min_level_size {
builder.min_level_size(value);
}
match builder.execute() {
Ok(()) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
}
};
};
let meta = match result {
Ok(()) => format!("valid update content processed in {:.02?}", before_update.elapsed()),
Ok(()) => {
format!("valid update content processed in {:.02?}", before_update.elapsed())
}
Err(e) => format!("error while processing update content: {:?}", e),
};
@ -500,7 +525,8 @@ async fn main() -> anyhow::Result<()> {
let _ = update_status_sender_cloned.send(processed);
Ok(meta)
})?;
},
)?;
// The database name will not change.
let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
@ -512,15 +538,11 @@ async fn main() -> anyhow::Result<()> {
let db_name_cloned = db_name.clone();
let lmdb_path_cloned = lmdb_path.clone();
let index_cloned = index.clone();
let dash_html_route = warp::filters::method::get()
.and(warp::filters::path::end())
.map(move || {
let dash_html_route =
warp::filters::method::get().and(warp::filters::path::end()).map(move || {
// We retrieve the database size.
let db_size = File::open(lmdb_path_cloned.clone())
.unwrap()
.metadata()
.unwrap()
.len() as usize;
let db_size =
File::open(lmdb_path_cloned.clone()).unwrap().metadata().unwrap().len() as usize;
// And the number of documents in the database.
let rtxn = index_cloned.read_txn().unwrap();
@ -537,111 +559,105 @@ async fn main() -> anyhow::Result<()> {
.and(warp::path!("updates"))
.map(move |header: String| {
let update_store = update_store_cloned.clone();
let mut updates = update_store.iter_metas(|processed, aborted, pending| {
let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new();
for result in processed {
let (uid, meta) = result?;
updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
}
for result in aborted {
let (uid, meta) = result?;
updates.push(UpdateStatus::Aborted { update_id: uid.get(), meta });
}
for result in pending {
let (uid, meta) = result?;
updates.push(UpdateStatus::Pending { update_id: uid.get(), meta });
}
Ok(updates)
}).unwrap();
let mut updates = update_store
.iter_metas(|processed, aborted, pending| {
let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new();
for result in processed {
let (uid, meta) = result?;
updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
}
for result in aborted {
let (uid, meta) = result?;
updates.push(UpdateStatus::Aborted { update_id: uid.get(), meta });
}
for result in pending {
let (uid, meta) = result?;
updates.push(UpdateStatus::Pending { update_id: uid.get(), meta });
}
Ok(updates)
})
.unwrap();
updates.sort_unstable_by(|s1, s2| s1.update_id().cmp(&s2.update_id()).reverse());
if header.contains("text/html") {
// We retrieve the database size.
let db_size = File::open(lmdb_path_cloned.clone())
.unwrap()
.metadata()
.unwrap()
.len() as usize;
let db_size =
File::open(lmdb_path_cloned.clone()).unwrap().metadata().unwrap().len()
as usize;
// And the number of documents in the database.
let rtxn = index_cloned.read_txn().unwrap();
let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize;
let template = UpdatesTemplate {
db_name: db_name.clone(),
db_size,
docs_count,
updates,
};
let template =
UpdatesTemplate { db_name: db_name.clone(), db_size, docs_count, updates };
Box::new(template) as Box<dyn warp::Reply>
} else {
Box::new(warp::reply::json(&updates))
}
});
let dash_bulma_route = warp::filters::method::get()
.and(warp::path!("bulma.min.css"))
.map(|| Response::builder()
.header("content-type", "text/css; charset=utf-8")
.body(include_str!("../public/bulma.min.css"))
);
let dash_bulma_route =
warp::filters::method::get().and(warp::path!("bulma.min.css")).map(|| {
Response::builder()
.header("content-type", "text/css; charset=utf-8")
.body(include_str!("../public/bulma.min.css"))
});
let dash_bulma_dark_route = warp::filters::method::get()
.and(warp::path!("bulma-prefers-dark.min.css"))
.map(|| Response::builder()
.header("content-type", "text/css; charset=utf-8")
.body(include_str!("../public/bulma-prefers-dark.min.css"))
);
let dash_bulma_dark_route =
warp::filters::method::get().and(warp::path!("bulma-prefers-dark.min.css")).map(|| {
Response::builder()
.header("content-type", "text/css; charset=utf-8")
.body(include_str!("../public/bulma-prefers-dark.min.css"))
});
let dash_style_route = warp::filters::method::get()
.and(warp::path!("style.css"))
.map(|| Response::builder()
let dash_style_route = warp::filters::method::get().and(warp::path!("style.css")).map(|| {
Response::builder()
.header("content-type", "text/css; charset=utf-8")
.body(include_str!("../public/style.css"))
);
});
let dash_jquery_route = warp::filters::method::get()
.and(warp::path!("jquery-3.4.1.min.js"))
.map(|| Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.body(include_str!("../public/jquery-3.4.1.min.js"))
);
let dash_jquery_route =
warp::filters::method::get().and(warp::path!("jquery-3.4.1.min.js")).map(|| {
Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.body(include_str!("../public/jquery-3.4.1.min.js"))
});
let dash_filesize_route = warp::filters::method::get()
.and(warp::path!("filesize.min.js"))
.map(|| Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.body(include_str!("../public/filesize.min.js"))
);
let dash_filesize_route =
warp::filters::method::get().and(warp::path!("filesize.min.js")).map(|| {
Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.body(include_str!("../public/filesize.min.js"))
});
let dash_script_route = warp::filters::method::get()
.and(warp::path!("script.js"))
.map(|| Response::builder()
let dash_script_route = warp::filters::method::get().and(warp::path!("script.js")).map(|| {
Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.body(include_str!("../public/script.js"))
);
});
let updates_script_route = warp::filters::method::get()
.and(warp::path!("updates-script.js"))
.map(|| Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.body(include_str!("../public/updates-script.js"))
);
let updates_script_route =
warp::filters::method::get().and(warp::path!("updates-script.js")).map(|| {
Response::builder()
.header("content-type", "application/javascript; charset=utf-8")
.body(include_str!("../public/updates-script.js"))
});
let dash_logo_white_route = warp::filters::method::get()
.and(warp::path!("logo-white.svg"))
.map(|| Response::builder()
.header("content-type", "image/svg+xml")
.body(include_str!("../public/logo-white.svg"))
);
let dash_logo_white_route =
warp::filters::method::get().and(warp::path!("logo-white.svg")).map(|| {
Response::builder()
.header("content-type", "image/svg+xml")
.body(include_str!("../public/logo-white.svg"))
});
let dash_logo_black_route = warp::filters::method::get()
.and(warp::path!("logo-black.svg"))
.map(|| Response::builder()
.header("content-type", "image/svg+xml")
.body(include_str!("../public/logo-black.svg"))
);
let dash_logo_black_route =
warp::filters::method::get().and(warp::path!("logo-black.svg")).map(|| {
Response::builder()
.header("content-type", "image/svg+xml")
.body(include_str!("../public/logo-black.svg"))
});
#[derive(Debug, Deserialize)]
#[serde(untagged)]
@ -719,7 +735,8 @@ async fn main() -> anyhow::Result<()> {
search.filter(condition);
}
let SearchResult { matching_words, candidates, documents_ids } = search.execute().unwrap();
let SearchResult { matching_words, candidates, documents_ids } =
search.execute().unwrap();
let number_of_candidates = candidates.len();
let facets = if query.facet_distribution == Some(true) {
@ -745,17 +762,18 @@ async fn main() -> anyhow::Result<()> {
for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() {
let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();
if !disable_highlighting {
highlighter.highlight_record(&mut object, &matching_words, &attributes_to_highlight);
highlighter.highlight_record(
&mut object,
&matching_words,
&attributes_to_highlight,
);
}
documents.push(object);
}
let answer = Answer {
documents,
number_of_candidates,
facets: facets.unwrap_or_default(),
};
let answer =
Answer { documents, number_of_candidates, facets: facets.unwrap_or_default() };
Response::builder()
.header("Content-Type", "application/json")
@ -764,9 +782,8 @@ async fn main() -> anyhow::Result<()> {
});
let index_cloned = index.clone();
let document_route = warp::filters::method::get()
.and(warp::path!("document" / String))
.map(move |id: String| {
let document_route = warp::filters::method::get().and(warp::path!("document" / String)).map(
move |id: String| {
let index = index_cloned.clone();
let rtxn = index.read_txn().unwrap();
@ -780,30 +797,31 @@ async fn main() -> anyhow::Result<()> {
match external_documents_ids.get(&id) {
Some(document_id) => {
let document_id = document_id as u32;
let (_, obkv) = index.documents(&rtxn, Some(document_id)).unwrap().pop().unwrap();
let (_, obkv) =
index.documents(&rtxn, Some(document_id)).unwrap().pop().unwrap();
let document = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();
Response::builder()
.header("Content-Type", "application/json")
.body(serde_json::to_string(&document).unwrap())
}
None => {
Response::builder()
.status(404)
.body(format!("Document with id {:?} not found.", id))
}
None => Response::builder()
.status(404)
.body(format!("Document with id {:?} not found.", id)),
}
});
},
);
async fn buf_stream(
update_store: Arc<UpdateStore<UpdateMeta, String>>,
update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>,
update_status_sender: broadcast::Sender<
UpdateStatus<UpdateMeta, UpdateMetaProgress, String>,
>,
update_method: Option<String>,
update_format: UpdateFormat,
encoding: Option<String>,
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
) -> Result<impl warp::Reply, warp::Rejection>
{
mut stream: impl futures::Stream<Item = Result<impl bytes::Buf, warp::Error>> + Unpin,
) -> Result<impl warp::Reply, warp::Rejection> {
let file = tokio::task::block_in_place(tempfile::tempfile).unwrap();
let mut file = TFile::from_std(file);
@ -869,9 +887,8 @@ async fn main() -> anyhow::Result<()> {
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let clearing_route = warp::filters::method::post()
.and(warp::path!("clear-documents"))
.map(move || {
let clearing_route =
warp::filters::method::post().and(warp::path!("clear-documents")).map(move || {
let meta = UpdateMeta::ClearDocuments;
let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
@ -919,9 +936,8 @@ async fn main() -> anyhow::Result<()> {
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let abort_pending_updates_route = warp::filters::method::delete()
.and(warp::path!("updates"))
.map(move || {
let abort_pending_updates_route =
warp::filters::method::delete().and(warp::path!("updates")).map(move || {
let updates = update_store_cloned.abort_pendings().unwrap();
for (update_id, meta) in updates {
let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta });
@ -930,25 +946,22 @@ async fn main() -> anyhow::Result<()> {
warp::reply()
});
let update_ws_route = warp::ws()
.and(warp::path!("updates" / "ws"))
.map(move |ws: warp::ws::Ws| {
let update_ws_route =
warp::ws().and(warp::path!("updates" / "ws")).map(move |ws: warp::ws::Ws| {
// And then our closure will be called when it completes...
let update_status_receiver = update_status_sender.subscribe();
ws.on_upgrade(|websocket| {
// Just echo all updates messages...
update_status_receiver
.into_stream()
.flat_map(|result| {
match result {
Ok(status) => {
let msg = serde_json::to_string(&status).unwrap();
stream::iter(Some(Ok(Message::text(msg))))
}
Err(e) => {
eprintln!("channel error: {:?}", e);
stream::iter(None)
}
.flat_map(|result| match result {
Ok(status) => {
let msg = serde_json::to_string(&status).unwrap();
stream::iter(Some(Ok(Message::text(msg))))
}
Err(e) => {
eprintln!("channel error: {:?}", e);
stream::iter(None)
}
})
.forward(websocket)
@ -988,10 +1001,9 @@ async fn main() -> anyhow::Result<()> {
#[cfg(test)]
mod tests {
use maplit::{btreeset,hashmap, hashset};
use serde_test::{assert_tokens, Token};
use maplit::{btreeset, hashmap, hashset};
use milli::update::Setting;
use serde_test::{assert_tokens, Token};
use crate::Settings;
@ -1000,50 +1012,53 @@ mod tests {
let settings = Settings {
displayed_attributes: Setting::Set(vec!["name".to_string()]),
searchable_attributes: Setting::Set(vec!["age".to_string()]),
filterable_attributes: Setting::Set(hashset!{ "age".to_string() }),
filterable_attributes: Setting::Set(hashset! { "age".to_string() }),
criteria: Setting::Set(vec!["asc(age)".to_string()]),
stop_words: Setting::Set(btreeset! { "and".to_string() }),
synonyms: Setting::Set(hashmap!{ "alex".to_string() => vec!["alexey".to_string()] })
synonyms: Setting::Set(hashmap! { "alex".to_string() => vec!["alexey".to_string()] }),
};
assert_tokens(&settings, &[
Token::Struct { name: "Settings", len: 6 },
Token::Str("displayedAttributes"),
Token::Some,
Token::Seq { len: Some(1) },
Token::Str("name"),
Token::SeqEnd,
Token::Str("searchableAttributes"),
Token::Some,
Token::Seq { len: Some(1) },
Token::Str("age"),
Token::SeqEnd,
Token::Str("facetedAttributes"),
Token::Some,
Token::Map { len: Some(1) },
Token::Str("age"),
Token::Str("integer"),
Token::MapEnd,
Token::Str("criteria"),
Token::Some,
Token::Seq { len: Some(1) },
Token::Str("asc(age)"),
Token::SeqEnd,
Token::Str("stopWords"),
Token::Some,
Token::Seq { len: Some(1) },
Token::Str("and"),
Token::SeqEnd,
Token::Str("synonyms"),
Token::Some,
Token::Map { len: Some(1) },
Token::Str("alex"),
Token::Seq {len: Some(1) },
Token::Str("alexey"),
Token::SeqEnd,
Token::MapEnd,
Token::StructEnd,
]);
assert_tokens(
&settings,
&[
Token::Struct { name: "Settings", len: 6 },
Token::Str("displayedAttributes"),
Token::Some,
Token::Seq { len: Some(1) },
Token::Str("name"),
Token::SeqEnd,
Token::Str("searchableAttributes"),
Token::Some,
Token::Seq { len: Some(1) },
Token::Str("age"),
Token::SeqEnd,
Token::Str("facetedAttributes"),
Token::Some,
Token::Map { len: Some(1) },
Token::Str("age"),
Token::Str("integer"),
Token::MapEnd,
Token::Str("criteria"),
Token::Some,
Token::Seq { len: Some(1) },
Token::Str("asc(age)"),
Token::SeqEnd,
Token::Str("stopWords"),
Token::Some,
Token::Seq { len: Some(1) },
Token::Str("and"),
Token::SeqEnd,
Token::Str("synonyms"),
Token::Some,
Token::Map { len: Some(1) },
Token::Str("alex"),
Token::Seq { len: Some(1) },
Token::Str("alexey"),
Token::SeqEnd,
Token::MapEnd,
Token::StructEnd,
],
);
}
#[test]
@ -1057,22 +1072,25 @@ mod tests {
synonyms: Setting::Reset,
};
assert_tokens(&settings, &[
Token::Struct { name: "Settings", len: 6 },
Token::Str("displayedAttributes"),
Token::None,
Token::Str("searchableAttributes"),
Token::None,
Token::Str("facetedAttributes"),
Token::None,
Token::Str("criteria"),
Token::None,
Token::Str("stopWords"),
Token::None,
Token::Str("synonyms"),
Token::None,
Token::StructEnd,
]);
assert_tokens(
&settings,
&[
Token::Struct { name: "Settings", len: 6 },
Token::Str("displayedAttributes"),
Token::None,
Token::Str("searchableAttributes"),
Token::None,
Token::Str("facetedAttributes"),
Token::None,
Token::Str("criteria"),
Token::None,
Token::Str("stopWords"),
Token::None,
Token::Str("synonyms"),
Token::None,
Token::StructEnd,
],
);
}
#[test]
@ -1086,9 +1104,6 @@ mod tests {
synonyms: Setting::NotSet,
};
assert_tokens(&settings, &[
Token::Struct { name: "Settings", len: 0 },
Token::StructEnd,
]);
assert_tokens(&settings, &[Token::Struct { name: "Settings", len: 0 }, Token::StructEnd]);
}
}

View file

@ -4,9 +4,9 @@ use std::path::Path;
use std::sync::Arc;
use crossbeam_channel::Sender;
use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice};
use heed::{EnvOpenOptions, Env, Database};
use serde::{Serialize, Deserialize};
use heed::types::{ByteSlice, DecodeIgnore, OwnedType, SerdeJson};
use heed::{Database, Env, EnvOpenOptions};
use serde::{Deserialize, Serialize};
pub type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
@ -25,7 +25,9 @@ pub trait UpdateHandler<M, N> {
}
impl<M, N, F> UpdateHandler<M, N> for F
where F: FnMut(u64, M, &[u8]) -> heed::Result<N> + Send + 'static {
where
F: FnMut(u64, M, &[u8]) -> heed::Result<N> + Send + 'static,
{
fn handle_update(&mut self, update_id: u64, meta: M, content: &[u8]) -> heed::Result<N> {
self(update_id, meta, content)
}
@ -82,26 +84,17 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
/// Returns the new biggest id to use to store the new update.
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
let last_pending = self.pending_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
let last_pending =
self.pending_meta.remap_data_type::<DecodeIgnore>().last(txn)?.map(|(k, _)| k.get());
let last_processed = self.processed_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
let last_processed =
self.processed_meta.remap_data_type::<DecodeIgnore>().last(txn)?.map(|(k, _)| k.get());
let last_aborted = self.aborted_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
let last_aborted =
self.aborted_meta.remap_data_type::<DecodeIgnore>().last(txn)?.map(|(k, _)| k.get());
let last_update_id = [last_pending, last_processed, last_aborted]
.iter()
.copied()
.flatten()
.max();
let last_update_id =
[last_pending, last_processed, last_aborted].iter().copied().flatten().max();
match last_update_id {
Some(last_id) => Ok(last_id + 1),
@ -112,7 +105,8 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
/// Registers the update content in the pending store and the meta
/// into the pending-meta store. Returns the new unique update id.
pub fn register_update(&self, meta: &M, content: &[u8]) -> heed::Result<u64>
where M: Serialize,
where
M: Serialize,
{
let mut wtxn = self.env.write_txn()?;
@ -152,9 +146,8 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
// a reader while processing it, not a writer.
match first_meta {
Some((first_id, first_meta)) => {
let first_content = self.pending
.get(&rtxn, &first_id)?
.expect("associated update content");
let first_content =
self.pending.get(&rtxn, &first_id)?.expect("associated update content");
// Process the pending update using the provided user function.
let new_meta = handler.handle_update(first_id.get(), first_meta, first_content)?;
@ -170,15 +163,16 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
wtxn.commit()?;
Ok(Some((first_id.get(), new_meta)))
},
None => Ok(None)
}
None => Ok(None),
}
}
/// The id and metadata of the update that is currently being processed,
/// `None` if no update is being processed.
pub fn processing_update(&self) -> heed::Result<Option<(u64, M)>>
where M: for<'a> Deserialize<'a>,
where
M: for<'a> Deserialize<'a>,
{
let rtxn = self.env.read_txn()?;
match self.pending_meta.first(&rtxn)? {
@ -242,7 +236,8 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
/// that as already been processed or which doesn't actually exist, will
/// return `None`.
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<M>>
where M: Serialize + for<'a> Deserialize<'a>,
where
M: Serialize + for<'a> Deserialize<'a>,
{
let mut wtxn = self.env.write_txn()?;
let key = BEU64::new(update_id);
@ -269,7 +264,8 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
/// Aborts all the pending updates, and not the one being currently processed.
/// Returns the update metas and ids that were successfully aborted.
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, M)>>
where M: Serialize + for<'a> Deserialize<'a>,
where
M: Serialize + for<'a> Deserialize<'a>,
{
let mut wtxn = self.env.write_txn()?;
let mut aborted_updates = Vec::new();
@ -303,17 +299,19 @@ pub enum UpdateStatusMeta<M, N> {
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::{Duration, Instant};
use super::*;
#[test]
fn simple() {
let dir = tempfile::tempdir().unwrap();
let options = EnvOpenOptions::new();
let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content:&_| {
let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content: &_| {
Ok(meta + " processed")
}).unwrap();
})
.unwrap();
let meta = String::from("kiki");
let update_id = update_store.register_update(&meta, &[]).unwrap();
@ -329,10 +327,11 @@ mod tests {
fn long_running_update() {
let dir = tempfile::tempdir().unwrap();
let options = EnvOpenOptions::new();
let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content:&_| {
let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content: &_| {
thread::sleep(Duration::from_millis(400));
Ok(meta + " processed")
}).unwrap();
})
.unwrap();
let before_register = Instant::now();