mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-22 21:04:27 +01:00
implement the documents batcher
This commit is contained in:
parent
1d73f484f0
commit
392ee86714
@ -1,18 +1,20 @@
|
|||||||
use actix_web::HttpRequest;
|
use actix_web::HttpRequest;
|
||||||
use meilisearch_lib::index::SearchQuery;
|
use meilisearch_lib::index::SearchQuery;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::collections::{HashMap, HashSet};
|
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::fs::read_to_string;
|
use std::fs::read_to_string;
|
||||||
|
|
||||||
|
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
||||||
use crate::Opt;
|
use crate::Opt;
|
||||||
|
|
||||||
// if we are in release mode and the feature analytics was enabled
|
// if we are in release mode and the feature analytics was enabled
|
||||||
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
||||||
mod segment {
|
mod segment {
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
|
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
||||||
use actix_web::http::header::USER_AGENT;
|
use actix_web::http::header::USER_AGENT;
|
||||||
use actix_web::HttpRequest;
|
use actix_web::HttpRequest;
|
||||||
|
use http::header::CONTENT_TYPE;
|
||||||
use meilisearch_lib::index::SearchQuery;
|
use meilisearch_lib::index::SearchQuery;
|
||||||
use meilisearch_lib::index_controller::Stats;
|
use meilisearch_lib::index_controller::Stats;
|
||||||
use meilisearch_lib::MeiliSearch;
|
use meilisearch_lib::MeiliSearch;
|
||||||
@ -38,6 +40,8 @@ mod segment {
|
|||||||
batcher: Mutex<AutoBatcher>,
|
batcher: Mutex<AutoBatcher>,
|
||||||
post_search_batcher: Mutex<SearchBatcher>,
|
post_search_batcher: Mutex<SearchBatcher>,
|
||||||
get_search_batcher: Mutex<SearchBatcher>,
|
get_search_batcher: Mutex<SearchBatcher>,
|
||||||
|
documents_added_batcher: Mutex<DocumentsBatcher>,
|
||||||
|
documents_updated_batcher: Mutex<DocumentsBatcher>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SegmentAnalytics {
|
impl SegmentAnalytics {
|
||||||
@ -110,6 +114,8 @@ mod segment {
|
|||||||
batcher,
|
batcher,
|
||||||
post_search_batcher: Mutex::new(SearchBatcher::default()),
|
post_search_batcher: Mutex::new(SearchBatcher::default()),
|
||||||
get_search_batcher: Mutex::new(SearchBatcher::default()),
|
get_search_batcher: Mutex::new(SearchBatcher::default()),
|
||||||
|
documents_added_batcher: Mutex::new(DocumentsBatcher::default()),
|
||||||
|
documents_updated_batcher: Mutex::new(DocumentsBatcher::default()),
|
||||||
});
|
});
|
||||||
let segment = Box::leak(segment);
|
let segment = Box::leak(segment);
|
||||||
|
|
||||||
@ -158,6 +164,17 @@ mod segment {
|
|||||||
post_search
|
post_search
|
||||||
.into_event(self.user.clone(), "Document Searched POST".to_string())
|
.into_event(self.user.clone(), "Document Searched POST".to_string())
|
||||||
});
|
});
|
||||||
|
let add_documents =
|
||||||
|
std::mem::take(&mut *self.documents_added_batcher.lock().await);
|
||||||
|
let add_documents = (add_documents.updated).then(|| {
|
||||||
|
add_documents.into_event(self.user.clone(), "Documents Added".to_string())
|
||||||
|
});
|
||||||
|
let update_documents =
|
||||||
|
std::mem::take(&mut *self.documents_updated_batcher.lock().await);
|
||||||
|
let update_documents = (update_documents.updated).then(|| {
|
||||||
|
update_documents
|
||||||
|
.into_event(self.user.clone(), "Documents Updated".to_string())
|
||||||
|
});
|
||||||
// keep the lock on the batcher just for these three operations
|
// keep the lock on the batcher just for these three operations
|
||||||
{
|
{
|
||||||
println!("ANALYTICS: taking the lock on the batcher");
|
println!("ANALYTICS: taking the lock on the batcher");
|
||||||
@ -168,6 +185,12 @@ mod segment {
|
|||||||
if let Some(post_search) = post_search {
|
if let Some(post_search) = post_search {
|
||||||
let _ = batcher.push(post_search).await;
|
let _ = batcher.push(post_search).await;
|
||||||
}
|
}
|
||||||
|
if let Some(add_documents) = add_documents {
|
||||||
|
let _ = batcher.push(add_documents).await;
|
||||||
|
}
|
||||||
|
if let Some(update_documents) = update_documents {
|
||||||
|
let _ = batcher.push(update_documents).await;
|
||||||
|
}
|
||||||
println!("ANALYTICS: Sending the batch");
|
println!("ANALYTICS: Sending the batch");
|
||||||
let _ = batcher.flush().await;
|
let _ = batcher.flush().await;
|
||||||
}
|
}
|
||||||
@ -199,8 +222,8 @@ mod segment {
|
|||||||
let max_limit = query.limit;
|
let max_limit = query.limit;
|
||||||
let max_offset = query.offset.unwrap_or_default();
|
let max_offset = query.offset.unwrap_or_default();
|
||||||
|
|
||||||
// to avoid blocking the search we are going to do the heavier computation in an async task
|
// to avoid blocking the search we are going to do the heavier computation and take the
|
||||||
// and take the mutex in the same task
|
// batcher's mutex in an async task
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let filtered = filter.is_some() as usize;
|
let filtered = filter.is_some() as usize;
|
||||||
let syntax = match filter.as_ref() {
|
let syntax = match filter.as_ref() {
|
||||||
@ -313,6 +336,70 @@ mod segment {
|
|||||||
search_batcher.time_spent.push(process_time);
|
search_batcher.time_spent.push(process_time);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn add_documents(
|
||||||
|
&'static self,
|
||||||
|
documents_query: &UpdateDocumentsQuery,
|
||||||
|
index_creation: bool,
|
||||||
|
request: &HttpRequest,
|
||||||
|
) {
|
||||||
|
let user_agents = request
|
||||||
|
.headers()
|
||||||
|
.get(USER_AGENT)
|
||||||
|
.map(|header| header.to_str().unwrap_or("unknown").to_string());
|
||||||
|
let primary_key = documents_query.primary_key.clone();
|
||||||
|
let content_type = request
|
||||||
|
.headers()
|
||||||
|
.get(CONTENT_TYPE)
|
||||||
|
.map(|s| s.to_str().unwrap_or("unkown"))
|
||||||
|
.unwrap()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut lock = self.documents_added_batcher.lock().await;
|
||||||
|
for user_agent in user_agents {
|
||||||
|
lock.user_agents.insert(user_agent);
|
||||||
|
}
|
||||||
|
lock.content_types.insert(content_type);
|
||||||
|
if let Some(primary_key) = primary_key {
|
||||||
|
lock.primary_keys.insert(primary_key);
|
||||||
|
}
|
||||||
|
lock.index_creation |= index_creation;
|
||||||
|
// drop the lock here
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_documents(
|
||||||
|
&'static self,
|
||||||
|
documents_query: &UpdateDocumentsQuery,
|
||||||
|
index_creation: bool,
|
||||||
|
request: &HttpRequest,
|
||||||
|
) {
|
||||||
|
let user_agents = request
|
||||||
|
.headers()
|
||||||
|
.get(USER_AGENT)
|
||||||
|
.map(|header| header.to_str().unwrap_or("unknown").to_string());
|
||||||
|
let primary_key = documents_query.primary_key.clone();
|
||||||
|
let content_type = request
|
||||||
|
.headers()
|
||||||
|
.get(CONTENT_TYPE)
|
||||||
|
.map(|s| s.to_str().unwrap_or("unkown"))
|
||||||
|
.unwrap()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut lock = self.documents_updated_batcher.lock().await;
|
||||||
|
for user_agent in user_agents {
|
||||||
|
lock.user_agents.insert(user_agent);
|
||||||
|
}
|
||||||
|
lock.content_types.insert(content_type);
|
||||||
|
if let Some(primary_key) = primary_key {
|
||||||
|
lock.primary_keys.insert(primary_key);
|
||||||
|
}
|
||||||
|
lock.index_creation |= index_creation;
|
||||||
|
// drop the lock here
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for SegmentAnalytics {
|
impl Display for SegmentAnalytics {
|
||||||
@ -410,6 +497,39 @@ mod segment {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct DocumentsBatcher {
|
||||||
|
// set to true when at least one request was received
|
||||||
|
updated: bool,
|
||||||
|
|
||||||
|
// context
|
||||||
|
user_agents: HashSet<String>,
|
||||||
|
|
||||||
|
content_types: HashSet<String>,
|
||||||
|
primary_keys: HashSet<String>,
|
||||||
|
index_creation: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DocumentsBatcher {
|
||||||
|
pub fn into_event(mut self, user: User, event_name: String) -> Track {
|
||||||
|
let context = Some(json!({ "user-agent": self.user_agents}));
|
||||||
|
|
||||||
|
let properties = json!({
|
||||||
|
"payload_type": self.content_types,
|
||||||
|
"primary_key": self.primary_keys,
|
||||||
|
"index_creation": self.index_creation,
|
||||||
|
});
|
||||||
|
|
||||||
|
Track {
|
||||||
|
user,
|
||||||
|
event: event_name,
|
||||||
|
context,
|
||||||
|
properties,
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we are in debug mode OR the analytics feature is disabled
|
// if we are in debug mode OR the analytics feature is disabled
|
||||||
@ -440,6 +560,20 @@ impl Analytics for MockAnalytics {
|
|||||||
fn end_get_search(&'static self, _process_time: usize) {}
|
fn end_get_search(&'static self, _process_time: usize) {}
|
||||||
fn start_post_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {}
|
fn start_post_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {}
|
||||||
fn end_post_search(&'static self, _process_time: usize) {}
|
fn end_post_search(&'static self, _process_time: usize) {}
|
||||||
|
fn add_documents(
|
||||||
|
&'static self,
|
||||||
|
_documents_query: &UpdateDocumentsQuery,
|
||||||
|
_index_creation: bool,
|
||||||
|
_request: &HttpRequest,
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
fn update_documents(
|
||||||
|
&'static self,
|
||||||
|
_documents_query: &UpdateDocumentsQuery,
|
||||||
|
_index_creation: bool,
|
||||||
|
_request: &HttpRequest,
|
||||||
|
) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for MockAnalytics {
|
impl Display for MockAnalytics {
|
||||||
@ -462,4 +596,19 @@ pub trait Analytics: Display + Sync + Send {
|
|||||||
fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest);
|
fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest);
|
||||||
/// This method should be called once a post search request has succeeded
|
/// This method should be called once a post search request has succeeded
|
||||||
fn end_post_search(&'static self, process_time: usize);
|
fn end_post_search(&'static self, process_time: usize);
|
||||||
|
|
||||||
|
// this method should be called to batch a add documents request
|
||||||
|
fn add_documents(
|
||||||
|
&'static self,
|
||||||
|
documents_query: &UpdateDocumentsQuery,
|
||||||
|
index_creation: bool,
|
||||||
|
request: &HttpRequest,
|
||||||
|
);
|
||||||
|
// this method should be called to batch a update documents request
|
||||||
|
fn update_documents(
|
||||||
|
&'static self,
|
||||||
|
documents_query: &UpdateDocumentsQuery,
|
||||||
|
index_creation: bool,
|
||||||
|
request: &HttpRequest,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
@ -123,7 +123,7 @@ pub async fn get_all_documents(
|
|||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
||||||
pub struct UpdateDocumentsQuery {
|
pub struct UpdateDocumentsQuery {
|
||||||
primary_key: Option<String>,
|
pub primary_key: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn add_documents(
|
pub async fn add_documents(
|
||||||
|
@ -3,7 +3,7 @@ use log::debug;
|
|||||||
use meilisearch_lib::index::{default_crop_length, SearchQuery, DEFAULT_SEARCH_LIMIT};
|
use meilisearch_lib::index::{default_crop_length, SearchQuery, DEFAULT_SEARCH_LIMIT};
|
||||||
use meilisearch_lib::MeiliSearch;
|
use meilisearch_lib::MeiliSearch;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::{json, Value};
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::error::ResponseError;
|
use crate::error::ResponseError;
|
||||||
|
@ -14,7 +14,7 @@ use crate::extractors::authentication::{policies::*, GuardedData};
|
|||||||
use crate::ApiKeys;
|
use crate::ApiKeys;
|
||||||
|
|
||||||
mod dump;
|
mod dump;
|
||||||
mod indexes;
|
pub mod indexes;
|
||||||
|
|
||||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||||
cfg.service(web::resource("/health").route(web::get().to(get_health)))
|
cfg.service(web::resource("/health").route(web::get().to(get_health)))
|
||||||
|
Loading…
Reference in New Issue
Block a user