1800: Analytics r=irevoire a=irevoire

Closes #1784
Implements [this spec](https://github.com/meilisearch/specifications/blob/update-analytics-specs/text/0034-telemetry-policies.md) 

# Anonymous Analytics Policy

## 1. Functional Specification

### I. Summary

This specification describes an exhaustive list of anonymous metrics collected by the MeiliSearch binary. It also describes the tools we use for this collection and how we identify a Meilisearch instance.

### II. Motivation

At MeiliSearch, our vision is to provide an easy-to-use search solution that meets the essential needs of our users. At all times, we strive to understand our users better and meet their expectations in the best possible way.

Although we can gather needs and understand our users through several channels such as Github, Slack, surveys, interviews or roadmap votes, we realize that this is not enough to have a complete view of MeiliSearch usage and features adoption. By cross-referencing our product discovery phases with aggregated quantitative data, we want to make the product much better than what it is today. Our decision-making will be taken a step further to make a product that users love.

### III. Explanation

#### General Data Protection Regulation (GDPR)

The metrics collected are non-sensitive, non-personal and do not identify an individual or a group of individuals using MeiliSearch. The data collected is secured and anonymized. We do not collect any data from the values stored in the documents.

We, the MeiliSearch team, provide an email address so that users can request the removal of their data: privacy@meilisearch.com.<br>
Thanks to the unique identifier generated for their MeiliSearch installation (`Instance uuid` when launching MeiliSearch), we can remove the corresponding data from all the tools we describe below. Any questions regarding the management of the data collected can be sent to the email address as well.

#### Tools

##### Segment

The collected data is sent to [Segment](https://segment.com/). Segment is a platform for data collection and provides data management tools.

##### Amplitude

[Amplitude](https://amplitude.com/) is a tool for graphing and highlighting collected data. Segment feeds Amplitude so that we can build visualizations according to our needs.

-----------
# The `identify` call we send every hour:

## System Configuration `system`

This property allows us to gather essential information to better understand on which type of machine MeiliSearch is used. This allows us to better advise users on the machines to choose according to their data volume and their use-cases.

 - [x] `system` => Never changes but still sent every hours
     - [x] distribution | On which distribution MeiliSearch is launched, eg: Arch Linux
     - [x] kernel_version | On which kernel version MeiliSearch is launched, eg: 5.14.10-arch1-1
     - [x] cores | How many cores does the machine have, eg: 24
     - [x] ram_size | Total capacity of the machine's ram. Expressed in `Kb`, eg: 33604210
     - [x] disk_size | Total capacity of the biggest disk. Expressed in `Kb`, eg: 336042103
     - [x] server_provider | Users can tell us on which provider MeiliSearch is hosted by filling the `MEILI_SERVER_PROVIDER` env var. This is also filled by our providers deploy scripts. e.g. GCP [cloud-config.yaml](56a7c2630c/scripts/providers/gcp/cloud-config.yaml (L33)), eg: gcp

## MeiliSearch Configuration

- [x] `context.app.version`: MeiliSearch version, eg: 0.23.0
- [x] `env`: `production` / `development`, eg: `production`
- [x] `has_snapshot`: Does the MeiliSearch instance has snapshot activated, eg: `true`

## MeiliSearch Statistics `stats`

 - [x] `stats`
     - [x] `database_size`: Size of indexed data. Expressed in `Kb`, eg: 180230
     - [x] `indexes_number`: Number of indexes, eg: 2
     - [x] `documents_number`: Number of indexed documents, eg: 165847
     - [x] `start_since_days`: How many days ago was the instance launched?, eg: 328

---------

- [x] Launched | This is the first event sent to mark that MeiliSearch is launched a first time

---------

- [x] `Documents Searched POST`: The Documents Searched event is sent once an hour. The event's properties are averaged over all search operations during that time so as not to track everything and generate unnecessary noise.
  - [x] `user-agent`: Represents all the user-agents encountered on this endpoint during one hour, eg: `["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]`
  - [x] `requests`
      - [x] `99th_response_time`: The maximum latency, in ms, for the fastest 99% of requests, eg: `57ms`
      - [x] `total_suceeded`: The total number of succeeded search requests, eg: `3456`
      - [x] `total_failed`: The total number of failed search requests, eg: `24`
      - [x] `total_received`: The total number of received search requests, eg: `3480`
  - [x] `sort`
      - [x] `with_geoPoint`: Does the built-in sort rule _geoPoint rule has been used?, eg: `true` /`false`
      - [x] `avg_criteria_number`: The average number of sort criteria among all the requests containing the sort parameter. "sort": [] equals to 0 while not sending sort does not influence the average, eg: `2`
  - [x] `filter`
      - [x] `with_geoRadius`: Does the built-in filter rule _geoRadius has been used?, eg: `true` /`false`
      - [x] `avg_criteria_number`: The average number of filter criteria among all the requests containing the filter parameter. "filter": [] equals to 0 while not sending filter does not influence the average, eg: `4`
      - [x] `most_used_syntax`: The most used filter syntax among all the requests containing the requests containing the filter parameter. `string` / `array` / `mixed`, `mixed`
  - [x] `q`
      - [x] `avg_terms_number`: The average number of terms for the `q` parameter among all requests, eg: `5`
  - [x] `pagination`:
      - [x] `max_limit`: The maximum limit encountered among all requests, eg: `20`
      - [x] `max_offset`: The maxium offset encountered among all requests, eg: `1000` 

---

- [x] `Documents Searched GET`: The Documents Searched event is sent once an hour. The event's properties are averaged over all search operations during that time so as not to track everything and generate unnecessary noise.
  - [x] `user-agent`: Represents all the user-agents encountered on this endpoint during one hour, eg: `["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]`
  - [x] `requests`
      - [x] `99th_response_time`: The maximum latency, in ms, for the fastest 99% of requests, eg: `57ms`
      - [x] `total_suceeded`: The total number of succeeded search requests, eg: `3456`
      - [x] `total_failed`: The total number of failed search requests, eg: `24`
      - [x] `total_received`: The total number of received search requests, eg: `3480`
  - [x] `sort`
      - [x] `with_geoPoint`: Does the built-in sort rule _geoPoint rule has been used?, eg: `true` /`false`
      - [x] `avg_criteria_number`: The average number of sort criteria among all the requests containing the sort parameter. "sort": [] equals to 0 while not sending sort does not influence the average, eg: `2`
  - [x] `filter`
      - [x] `with_geoRadius`: Does the built-in filter rule _geoRadius has been used?, eg: `true` /`false`
      - [x] `avg_criteria_number`: The average number of filter criteria among all the requests containing the filter parameter. "filter": [] equals to 0 while not sending filter does not influence the average, eg: `4`
      - [x] `most_used_syntax`: The most used filter syntax among all the requests containing the requests containing the filter parameter. `string` / `array` / `mixed`, `mixed`
  - [x] `q`
      - [x] `avg_terms_number`: The average number of terms for the `q` parameter among all requests, eg: `5`
  - [x] `pagination`:
      - [x] `max_limit`: The maximum limit encountered among all requests, eg: `20`
      - [x] `max_offset`: The maxium offset encountered among all requests, eg: `1000` 

---

- [x] `Index Created`
  - [x] `user-agent`: Represents the user-agent encountered for this API call, eg: ["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]
  - [x] `primary_key`: The name of the field used as primary key if set, otherwise `null`, eg: `id`

---

- [x] `Index Updated`
  - [x] `user-agent`: Represents the user-agent encountered for this API call, eg: ["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]
  - [x] `primary_key`: The name of the field used as primary key if set, otherwise `null`, eg: `id`

---

- [x] `Documents Added`: The Documents Added event is sent once an hour. The event's properties are averaged over all POST /documents additions operations during that time to not track everything and generate unnecessary noise.
  - [x] `user-agent`: Represents the user-agent encountered for this API call, eg: ["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]
  - [x] `payload_type`: Represents all the `payload_type` encountered on this endpoint during one hour, eg: [`text/csv`]
  - [x] `primary_key`: The name of the field used as primary key if set, otherwise `null`, eg: `id`
  - [x] `index_creation`: Does an index creation happened, eg: `false`

---

- [x] `Documents Updated`: The Documents Added event is sent once an hour. The event's properties are averaged over all PUT /documents additions operations during that time to not track everything and generate unnecessary noise.
  - [x] `user-agent`: Represents the user-agent encountered for this API call, eg: ["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]
  - [x] `payload_type`: Represents all the `payload_type` encountered on this endpoint during one hour, eg: [`application/json`]
  - [x] `primary_key`: The name of the field used as primary key if set, otherwise `null`, eg: `id`
  - [x] `index_creation`: Does an index creation happened, eg: `false`

---

- [x] Settings Updated
  - [x] `user-agent`: Represents the user-agent encountered for this API call, eg: ["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]
  - [x] `ranking_rules`
      - [x] `sort_position`: Position of the `sort` ranking rule if any, otherwise `null`, eg: `5`
  - [x] `sortable_attributes`
      - [x] `total`: Number of sortable attributes, eg: `3`
      - [x] `has_geo`: Indicate if `_geo` is set as a sortable attribute, eg: `false`
  - [x] `filterable_attributes`
      - [x] `total`: Number of filterable attributes, eg: `3`
      - [x] `has_geo`: Indicate if `_geo` is set as a filterable attribute, eg: `false`

---

- [x] `RankingRules Updated`
  - [x] `user-agent`: Represents the user-agent encountered for this API call, eg: ["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]
  - [x] `sort_position`: Position of the `sort` ranking rule if any, otherwise `null`, eg: `5`

---

- [x] `SortableAttributes Updated`
  - [x] `user-agent`: Represents the user-agent encountered for this API call, eg: ["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]
  - [x] `total`: Number of sortable attributes, eg: `3`
  - [x] `has_geo`: Indicate if `_geo` is set as a sortable attribute, eg: `false`

---

- [x] `FilterableAttributes Updated`
  - [x] `user-agent`: Represents the user-agent encountered for this API call, eg: ["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]
  - [x] `total`: Number of filterable attributes, eg: `3`
  - [x] `has_geo`: Indicate if `_geo` is set as a filterable attribute, eg: `false`

---

- [x] Dump Created
  - [x] `user-agent`: Represents the user-agent encountered for this API call, eg: ["MeiliSearch Ruby (2.1)", "Ruby (3.0)"]

---

Ensure the user-id file is well saved and loaded with:
- [x] the dumps
- [x]  the snapshots



- [x] Ensure the CLI uuid only show if analytics are activate at launch **or already exists** (=even if meilisearch was launched without analytics)

Co-authored-by: Tamo <tamo@meilisearch.com>
Co-authored-by: Irevoire <tamo@meilisearch.com>
This commit is contained in:
bors[bot] 2021-10-29 16:11:03 +00:00 committed by GitHub
commit c32f13a909
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1037 additions and 206 deletions

57
Cargo.lock generated
View File

@ -849,6 +849,27 @@ dependencies = [
"generic-array 0.14.4",
]
[[package]]
name = "dirs-next"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf36e65a80337bea855cd4ef9b8401ffce06a7baedf2e85ec467b1ac3f6e82b6"
dependencies = [
"cfg-if 1.0.0",
"dirs-sys-next",
]
[[package]]
name = "dirs-sys-next"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]]
name = "discard"
version = "1.0.4"
@ -1653,11 +1674,13 @@ dependencies = [
"parking_lot",
"paste",
"pin-project",
"platform-dirs",
"rand",
"rayon",
"regex",
"reqwest",
"rustls",
"segment",
"serde",
"serde_json",
"serde_url_params",
@ -1677,7 +1700,6 @@ dependencies = [
"uuid",
"vergen",
"walkdir",
"whoami",
"zip",
]
@ -2172,6 +2194,15 @@ version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb"
[[package]]
name = "platform-dirs"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e188d043c1a692985f78b5464853a263f1a27e5bd6322bad3a4078ee3c998a38"
dependencies = [
"dirs-next",
]
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -2353,6 +2384,16 @@ dependencies = [
"bitflags",
]
[[package]]
name = "redox_users"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64"
dependencies = [
"getrandom",
"redox_syscall",
]
[[package]]
name = "regex"
version = "1.5.4"
@ -2540,6 +2581,20 @@ dependencies = [
"untrusted",
]
[[package]]
name = "segment"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bdcc286fff0e7c5ccd46c06a301c7a8a848b06acedc6983707bd311eb358002"
dependencies = [
"async-trait",
"chrono",
"reqwest",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "semver"
version = "0.9.0"

View File

@ -51,10 +51,12 @@ mime = "0.3.16"
num_cpus = "1.13.0"
once_cell = "1.8.0"
parking_lot = "0.11.2"
platform-dirs = "0.3.0"
rand = "0.8.4"
rayon = "1.5.1"
regex = "1.5.4"
rustls = "0.19.1"
segment = { version = "0.1.2", optional = true }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = { version = "1.0.67", features = ["preserve_order"] }
sha2 = "0.9.6"
@ -69,8 +71,6 @@ uuid = { version = "0.8.2", features = ["serde"] }
walkdir = "2.3.2"
obkv = "0.2.0"
pin-project = "1.0.8"
whoami = { version = "1.1.3", optional = true }
reqwest = { version = "0.11.4", features = ["json", "rustls-tls"], default-features = false, optional = true }
sysinfo = "0.20.2"
tokio-stream = "0.1.7"
@ -91,7 +91,7 @@ mini-dashboard = [
"tempfile",
"zip",
]
analytics = ["whoami", "reqwest"]
analytics = ["segment"]
default = ["analytics", "mini-dashboard"]
[target.'cfg(target_os = "linux")'.dependencies]

View File

@ -1,126 +0,0 @@
use std::hash::{Hash, Hasher};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use log::debug;
use meilisearch_lib::MeiliSearch;
use serde::Serialize;
use siphasher::sip::SipHasher;
use crate::Opt;
const AMPLITUDE_API_KEY: &str = "f7fba398780e06d8fe6666a9be7e3d47";
#[derive(Debug, Serialize)]
struct EventProperties {
database_size: u64,
last_update_timestamp: Option<i64>, //timestamp
number_of_documents: Vec<u64>,
}
impl EventProperties {
async fn from(data: MeiliSearch) -> anyhow::Result<EventProperties> {
let stats = data.get_all_stats().await?;
let database_size = stats.database_size;
let last_update_timestamp = stats.last_update.map(|u| u.timestamp());
let number_of_documents = stats
.indexes
.values()
.map(|index| index.number_of_documents)
.collect();
Ok(EventProperties {
database_size,
last_update_timestamp,
number_of_documents,
})
}
}
#[derive(Debug, Serialize)]
struct UserProperties<'a> {
env: &'a str,
start_since_days: u64,
user_email: Option<String>,
server_provider: Option<String>,
}
#[derive(Debug, Serialize)]
struct Event<'a> {
user_id: &'a str,
event_type: &'a str,
device_id: &'a str,
time: u64,
app_version: &'a str,
user_properties: UserProperties<'a>,
event_properties: Option<EventProperties>,
}
#[derive(Debug, Serialize)]
struct AmplitudeRequest<'a> {
api_key: &'a str,
events: Vec<Event<'a>>,
}
pub async fn analytics_sender(data: MeiliSearch, opt: Opt) {
let username = whoami::username();
let hostname = whoami::hostname();
let platform = whoami::platform();
let uid = username + &hostname + &platform.to_string();
let mut hasher = SipHasher::new();
uid.hash(&mut hasher);
let hash = hasher.finish();
let uid = format!("{:X}", hash);
let platform = platform.to_string();
let first_start = Instant::now();
loop {
let n = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let user_id = &uid;
let device_id = &platform;
let time = n.as_secs();
let event_type = "runtime_tick";
let elapsed_since_start = first_start.elapsed().as_secs() / 86_400; // One day
let event_properties = EventProperties::from(data.clone()).await.ok();
let app_version = env!("CARGO_PKG_VERSION").to_string();
let app_version = app_version.as_str();
let user_email = std::env::var("MEILI_USER_EMAIL").ok();
let server_provider = std::env::var("MEILI_SERVER_PROVIDER").ok();
let user_properties = UserProperties {
env: &opt.env,
start_since_days: elapsed_since_start,
user_email,
server_provider,
};
let event = Event {
user_id,
event_type,
device_id,
time,
app_version,
user_properties,
event_properties,
};
let request = AmplitudeRequest {
api_key: AMPLITUDE_API_KEY,
events: vec![event],
};
let response = reqwest::Client::new()
.post("https://api2.amplitude.com/2/httpapi")
.timeout(Duration::from_secs(60)) // 1 minute max
.json(&request)
.send()
.await;
if let Err(e) = response {
debug!("Unsuccessful call to Amplitude: {}", e);
}
tokio::time::sleep(Duration::from_secs(3600)).await;
}
}

View File

@ -0,0 +1,51 @@
use std::{any::Any, sync::Arc};
use actix_web::HttpRequest;
use serde_json::Value;
use crate::{routes::indexes::documents::UpdateDocumentsQuery, Opt};
use super::{find_user_id, Analytics};
pub struct MockAnalytics;
#[derive(Default)]
pub struct SearchAggregator {}
#[allow(dead_code)]
impl SearchAggregator {
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
Self::default()
}
pub fn finish(&mut self, _: &dyn Any) {}
}
impl MockAnalytics {
#[allow(clippy::new_ret_no_self)]
pub fn new(opt: &Opt) -> (Arc<dyn Analytics>, String) {
let user = find_user_id(&opt.db_path).unwrap_or_default();
(Arc::new(Self), user)
}
}
impl Analytics for MockAnalytics {
// These methods are noop and should be optimized out
fn publish(&self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {}
fn get_search(&self, _aggregate: super::SearchAggregator) {}
fn post_search(&self, _aggregate: super::SearchAggregator) {}
fn add_documents(
&self,
_documents_query: &UpdateDocumentsQuery,
_index_creation: bool,
_request: &HttpRequest,
) {
}
fn update_documents(
&self,
_documents_query: &UpdateDocumentsQuery,
_index_creation: bool,
_request: &HttpRequest,
) {
}
}

View File

@ -0,0 +1,84 @@
mod mock_analytics;
// if we are in release mode and the feature analytics was enabled
#[cfg(all(not(debug_assertions), feature = "analytics"))]
mod segment_analytics;
use std::fs;
use std::path::{Path, PathBuf};
use actix_web::HttpRequest;
use once_cell::sync::Lazy;
use platform_dirs::AppDirs;
use serde_json::Value;
use crate::routes::indexes::documents::UpdateDocumentsQuery;
pub use mock_analytics::MockAnalytics;
// if we are in debug mode OR the analytics feature is disabled
// the `SegmentAnalytics` point to the mock instead of the real analytics
#[cfg(any(debug_assertions, not(feature = "analytics")))]
pub type SegmentAnalytics = mock_analytics::MockAnalytics;
#[cfg(any(debug_assertions, not(feature = "analytics")))]
pub type SearchAggregator = mock_analytics::SearchAggregator;
// if we are in release mode and the feature analytics was enabled
// we use the real analytics
#[cfg(all(not(debug_assertions), feature = "analytics"))]
pub type SegmentAnalytics = segment_analytics::SegmentAnalytics;
#[cfg(all(not(debug_assertions), feature = "analytics"))]
pub type SearchAggregator = segment_analytics::SearchAggregator;
/// The MeiliSearch config dir:
/// `~/.config/MeiliSearch` on *NIX or *BSD.
/// `~/Library/ApplicationSupport` on macOS.
/// `%APPDATA` (= `C:\Users%USERNAME%\AppData\Roaming`) on windows.
static MEILISEARCH_CONFIG_PATH: Lazy<Option<PathBuf>> =
Lazy::new(|| AppDirs::new(Some("MeiliSearch"), false).map(|appdir| appdir.config_dir));
fn config_user_id_path(db_path: &Path) -> Option<PathBuf> {
db_path
.canonicalize()
.ok()
.map(|path| {
path.join("instance-uid")
.display()
.to_string()
.replace("/", "-")
})
.zip(MEILISEARCH_CONFIG_PATH.as_ref())
.map(|(filename, config_path)| config_path.join(filename.trim_start_matches('-')))
}
/// Look for the instance-uid in the `data.ms` or in `~/.config/MeiliSearch/path-to-db-instance-uid`
fn find_user_id(db_path: &Path) -> Option<String> {
fs::read_to_string(db_path.join("instance-uid"))
.ok()
.or_else(|| fs::read_to_string(&config_user_id_path(db_path)?).ok())
}
pub trait Analytics: Sync + Send {
/// The method used to publish most analytics that do not need to be batched every hours
fn publish(&self, event_name: String, send: Value, request: Option<&HttpRequest>);
/// This method should be called to aggergate a get search
fn get_search(&self, aggregate: SearchAggregator);
/// This method should be called to aggregate a post search
fn post_search(&self, aggregate: SearchAggregator);
// this method should be called to aggregate a add documents request
fn add_documents(
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
);
// this method should be called to batch a update documents request
fn update_documents(
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
);
}

View File

@ -0,0 +1,517 @@
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix_web::http::header::USER_AGENT;
use actix_web::HttpRequest;
use http::header::CONTENT_TYPE;
use meilisearch_lib::index::{SearchQuery, SearchResult};
use meilisearch_lib::index_controller::Stats;
use meilisearch_lib::MeiliSearch;
use once_cell::sync::Lazy;
use regex::Regex;
use segment::message::{Identify, Track, User};
use segment::{AutoBatcher, Batcher, HttpClient};
use serde_json::{json, Value};
use sysinfo::{DiskExt, System, SystemExt};
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use uuid::Uuid;
use crate::analytics::Analytics;
use crate::routes::indexes::documents::UpdateDocumentsQuery;
use crate::Opt;
use super::{config_user_id_path, MEILISEARCH_CONFIG_PATH};
/// Write the instance-uid in the `data.ms` and in `~/.config/MeiliSearch/path-to-db-instance-uid`. Ignore the errors.
fn write_user_id(db_path: &Path, user_id: &str) {
let _ = fs::write(db_path.join("instance-uid"), user_id.as_bytes());
if let Some((meilisearch_config_path, user_id_path)) = MEILISEARCH_CONFIG_PATH
.as_ref()
.zip(config_user_id_path(db_path))
{
let _ = fs::create_dir_all(&meilisearch_config_path);
let _ = fs::write(user_id_path, user_id.as_bytes());
}
}
const SEGMENT_API_KEY: &str = "vHi89WrNDckHSQssyUJqLvIyp2QFITSC";
pub fn extract_user_agents(request: &HttpRequest) -> Vec<String> {
request
.headers()
.get(USER_AGENT)
.map(|header| header.to_str().ok())
.flatten()
.unwrap_or("unknown")
.split(';')
.map(str::trim)
.map(ToString::to_string)
.collect()
}
pub enum AnalyticsMsg {
BatchMessage(Track),
AggregateGetSearch(SearchAggregator),
AggregatePostSearch(SearchAggregator),
AggregateAddDocuments(DocumentsAggregator),
AggregateUpdateDocuments(DocumentsAggregator),
}
pub struct SegmentAnalytics {
sender: Sender<AnalyticsMsg>,
user: User,
}
impl SegmentAnalytics {
pub async fn new(opt: &Opt, meilisearch: &MeiliSearch) -> (Arc<dyn Analytics>, String) {
let user_id = super::find_user_id(&opt.db_path);
let first_time_run = user_id.is_none();
let user_id = user_id.unwrap_or_else(|| Uuid::new_v4().to_string());
write_user_id(&opt.db_path, &user_id);
let client = HttpClient::default();
let user = User::UserId { user_id };
let batcher = AutoBatcher::new(client, Batcher::new(None), SEGMENT_API_KEY.to_string());
let (sender, inbox) = mpsc::channel(100); // How many analytics can we bufferize
let segment = Box::new(Segment {
inbox,
user: user.clone(),
opt: opt.clone(),
batcher,
post_search_aggregator: SearchAggregator::default(),
get_search_aggregator: SearchAggregator::default(),
add_documents_aggregator: DocumentsAggregator::default(),
update_documents_aggregator: DocumentsAggregator::default(),
});
tokio::spawn(segment.run(meilisearch.clone()));
let this = Self {
sender,
user: user.clone(),
};
// batch the launched for the first time track event
if first_time_run {
this.publish("Launched".to_string(), json!({}), None);
}
(Arc::new(this), user.to_string())
}
}
impl super::Analytics for SegmentAnalytics {
fn publish(&self, event_name: String, mut send: Value, request: Option<&HttpRequest>) {
let user_agent = request
.map(|req| req.headers().get(USER_AGENT))
.flatten()
.map(|header| header.to_str().unwrap_or("unknown"))
.map(|s| s.split(';').map(str::trim).collect::<Vec<&str>>());
send["user-agent"] = json!(user_agent);
let event = Track {
user: self.user.clone(),
event: event_name.clone(),
properties: send,
..Default::default()
};
let _ = self
.sender
.try_send(AnalyticsMsg::BatchMessage(event.into()));
}
fn get_search(&self, aggregate: SearchAggregator) {
let _ = self
.sender
.try_send(AnalyticsMsg::AggregateGetSearch(aggregate));
}
fn post_search(&self, aggregate: SearchAggregator) {
let _ = self
.sender
.try_send(AnalyticsMsg::AggregatePostSearch(aggregate));
}
fn add_documents(
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
) {
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
let _ = self
.sender
.try_send(AnalyticsMsg::AggregateAddDocuments(aggregate));
}
fn update_documents(
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
) {
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
let _ = self
.sender
.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
}
}
pub struct Segment {
inbox: Receiver<AnalyticsMsg>,
user: User,
opt: Opt,
batcher: AutoBatcher,
get_search_aggregator: SearchAggregator,
post_search_aggregator: SearchAggregator,
add_documents_aggregator: DocumentsAggregator,
update_documents_aggregator: DocumentsAggregator,
}
impl Segment {
fn compute_traits(opt: &Opt, stats: Stats) -> Value {
static FIRST_START_TIMESTAMP: Lazy<Instant> = Lazy::new(Instant::now);
static SYSTEM: Lazy<Value> = Lazy::new(|| {
let mut sys = System::new_all();
sys.refresh_all();
let kernel_version = sys
.kernel_version()
.map(|k| k.split_once("-").map(|(k, _)| k.to_string()))
.flatten();
json!({
"distribution": sys.name(),
"kernel_version": kernel_version,
"cores": sys.processors().len(),
"ram_size": sys.total_memory(),
"disk_size": sys.disks().iter().map(|disk| disk.available_space()).max(),
"server_provider": std::env::var("MEILI_SERVER_PROVIDER").ok(),
})
});
let infos = json!({
"env": opt.env.clone(),
"has_snapshot": opt.schedule_snapshot,
});
let number_of_documents = stats
.indexes
.values()
.map(|index| index.number_of_documents)
.collect::<Vec<u64>>();
json!({
"start_since_days": FIRST_START_TIMESTAMP.elapsed().as_secs() / (60 * 60 * 24), // one day
"system": *SYSTEM,
"stats": {
"database_size": stats.database_size,
"indexes_number": stats.indexes.len(),
"documents_number": number_of_documents,
},
"infos": infos,
})
}
async fn run(mut self, meilisearch: MeiliSearch) {
const INTERVAL: Duration = Duration::from_secs(60 * 60); // one hour
let mut interval = tokio::time::interval(INTERVAL);
loop {
select! {
_ = interval.tick() => {
self.tick(meilisearch.clone()).await;
},
msg = self.inbox.recv() => {
match msg {
Some(AnalyticsMsg::BatchMessage(msg)) => drop(self.batcher.push(msg).await),
Some(AnalyticsMsg::AggregateGetSearch(agreg)) => self.get_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostSearch(agreg)) => self.post_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
None => (),
}
}
}
}
}
async fn tick(&mut self, meilisearch: MeiliSearch) {
if let Ok(stats) = meilisearch.get_all_stats().await {
let _ = self
.batcher
.push(Identify {
context: Some(json!({
"app": {
"version": env!("CARGO_PKG_VERSION").to_string(),
},
})),
user: self.user.clone(),
traits: Self::compute_traits(&self.opt, stats),
..Default::default()
})
.await;
}
let get_search = std::mem::take(&mut self.get_search_aggregator)
.into_event(&self.user, "Document Searched GET");
let post_search = std::mem::take(&mut self.post_search_aggregator)
.into_event(&self.user, "Document Searched POST");
let add_documents = std::mem::take(&mut self.add_documents_aggregator)
.into_event(&self.user, "Documents Added");
let update_documents = std::mem::take(&mut self.update_documents_aggregator)
.into_event(&self.user, "Documents Updated");
if let Some(get_search) = get_search {
let _ = self.batcher.push(get_search).await;
}
if let Some(post_search) = post_search {
let _ = self.batcher.push(post_search).await;
}
if let Some(add_documents) = add_documents {
let _ = self.batcher.push(add_documents).await;
}
if let Some(update_documents) = update_documents {
let _ = self.batcher.push(update_documents).await;
}
let _ = self.batcher.flush().await;
}
}
#[derive(Default)]
pub struct SearchAggregator {
// context
user_agents: HashSet<String>,
// requests
total_received: usize,
total_succeeded: usize,
time_spent: Vec<usize>,
// sort
sort_with_geo_point: bool,
// everytime a request has a filter, this field must be incremented by the number of terms it contains
sort_sum_of_criteria_terms: usize,
// everytime a request has a filter, this field must be incremented by one
sort_total_number_of_criteria: usize,
// filter
filter_with_geo_radius: bool,
// everytime a request has a filter, this field must be incremented by the number of terms it contains
filter_sum_of_criteria_terms: usize,
// everytime a request has a filter, this field must be incremented by one
filter_total_number_of_criteria: usize,
used_syntax: HashMap<String, usize>,
// q
// everytime a request has a q field, this field must be incremented by the number of terms
sum_of_terms_count: usize,
// everytime a request has a q field, this field must be incremented by one
total_number_of_q: usize,
// pagination
max_limit: usize,
max_offset: usize,
}
impl SearchAggregator {
pub fn from_query(query: &SearchQuery, request: &HttpRequest) -> Self {
let mut ret = Self::default();
ret.total_received = 1;
ret.user_agents = extract_user_agents(request).into_iter().collect();
if let Some(ref sort) = query.sort {
ret.sort_total_number_of_criteria = 1;
ret.sort_with_geo_point = sort.iter().any(|s| s.contains("_geoPoint("));
ret.sort_sum_of_criteria_terms = sort.len();
}
if let Some(ref filter) = query.filter {
static RE: Lazy<Regex> = Lazy::new(|| Regex::new("AND | OR").unwrap());
ret.filter_total_number_of_criteria = 1;
let syntax = match filter {
Value::String(_) => "string".to_string(),
Value::Array(values) => {
if values
.iter()
.map(|v| v.to_string())
.any(|s| RE.is_match(&s))
{
"mixed".to_string()
} else {
"array".to_string()
}
}
_ => "none".to_string(),
};
// convert the string to a HashMap
ret.used_syntax.insert(syntax, 1);
let stringified_filters = filter.to_string();
ret.filter_with_geo_radius = stringified_filters.contains("_geoRadius(");
ret.filter_sum_of_criteria_terms = RE.split(&stringified_filters).count();
}
if let Some(ref q) = query.q {
ret.total_number_of_q = 1;
ret.sum_of_terms_count = q.split_whitespace().count();
}
ret.max_limit = query.limit;
ret.max_offset = query.offset.unwrap_or_default();
ret
}
pub fn finish(&mut self, result: &SearchResult) {
self.total_succeeded += 1;
self.time_spent.push(result.processing_time_ms as usize);
}
/// Aggregate one [SearchAggregator] into another.
pub fn aggregate(&mut self, mut other: Self) {
// context
for user_agent in other.user_agents.into_iter() {
self.user_agents.insert(user_agent);
}
// request
self.total_received += other.total_received;
self.total_succeeded += other.total_succeeded;
self.time_spent.append(&mut other.time_spent);
// sort
self.sort_with_geo_point |= other.sort_with_geo_point;
self.sort_sum_of_criteria_terms += other.sort_sum_of_criteria_terms;
self.sort_total_number_of_criteria += other.sort_total_number_of_criteria;
// filter
self.filter_with_geo_radius |= other.filter_with_geo_radius;
self.filter_sum_of_criteria_terms += other.filter_sum_of_criteria_terms;
self.filter_total_number_of_criteria += other.filter_total_number_of_criteria;
for (key, value) in other.used_syntax.into_iter() {
*self.used_syntax.entry(key).or_insert(0) += value;
}
// q
self.sum_of_terms_count += other.sum_of_terms_count;
self.total_number_of_q += other.total_number_of_q;
// pagination
self.max_limit = self.max_limit.max(other.max_limit);
self.max_offset = self.max_offset.max(other.max_offset);
}
pub fn into_event(mut self, user: &User, event_name: &str) -> Option<Track> {
if self.total_received == 0 {
None
} else {
let percentile_99th = 0.99 * (self.total_succeeded as f64 - 1.) + 1.;
self.time_spent.drain(percentile_99th as usize..);
let properties = json!({
"user-agent": self.user_agents,
"requests": {
"99th_response_time": format!("{:.2}", self.time_spent.iter().sum::<usize>() as f64 / self.time_spent.len() as f64),
"total_succeeded": self.total_succeeded,
"total_failed": self.total_received.saturating_sub(self.total_succeeded), // just to be sure we never panics
"total_received": self.total_received,
},
"sort": {
"with_geoPoint": self.sort_with_geo_point,
"avg_criteria_number": format!("{:.2}", self.sort_sum_of_criteria_terms as f64 / self.sort_total_number_of_criteria as f64),
},
"filter": {
"with_geoRadius": self.filter_with_geo_radius,
"avg_criteria_number": format!("{:.2}", self.filter_sum_of_criteria_terms as f64 / self.filter_total_number_of_criteria as f64),
"most_used_syntax": self.used_syntax.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)),
},
"q": {
"avg_terms_number": format!("{:.2}", self.sum_of_terms_count as f64 / self.total_number_of_q as f64),
},
"pagination": {
"max_limit": self.max_limit,
"max_offset": self.max_offset,
},
});
Some(Track {
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
})
}
}
}
#[derive(Default)]
pub struct DocumentsAggregator {
// set to true when at least one request was received
updated: bool,
// context
user_agents: HashSet<String>,
content_types: HashSet<String>,
primary_keys: HashSet<String>,
index_creation: bool,
}
impl DocumentsAggregator {
pub fn from_query(
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
) -> Self {
let mut ret = Self::default();
ret.updated = true;
ret.user_agents = extract_user_agents(request).into_iter().collect();
if let Some(primary_key) = documents_query.primary_key.clone() {
ret.primary_keys.insert(primary_key);
}
let content_type = request
.headers()
.get(CONTENT_TYPE)
.map(|s| s.to_str().unwrap_or("unkown"))
.unwrap()
.to_string();
ret.content_types.insert(content_type);
ret.index_creation = index_creation;
ret
}
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
self.updated |= other.updated;
// we can't create a union because there is no `into_union` method
for user_agent in other.user_agents.into_iter() {
self.user_agents.insert(user_agent);
}
for primary_key in other.primary_keys.into_iter() {
self.primary_keys.insert(primary_key);
}
for content_type in other.content_types.into_iter() {
self.content_types.insert(content_type);
}
self.index_creation |= other.index_creation;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
if !self.updated {
None
} else {
let properties = json!({
"user-agent": self.user_agents,
"payload_type": self.content_types,
"primary_key": self.primary_keys,
"index_creation": self.index_creation,
});
Some(Track {
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
})
}
}
}

View File

@ -3,16 +3,17 @@
pub mod error;
#[macro_use]
pub mod extractors;
#[cfg(all(not(debug_assertions), feature = "analytics"))]
pub mod analytics;
pub mod helpers;
pub mod option;
pub mod routes;
use std::sync::Arc;
use std::time::Duration;
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::AuthConfig;
use actix_web::error::JsonPayloadError;
use analytics::Analytics;
use error::PayloadError;
use http::header::CONTENT_TYPE;
pub use option::Opt;
@ -74,10 +75,16 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<MeiliSearch> {
meilisearch.build(opt.db_path.clone(), opt.indexer_options.clone())
}
pub fn configure_data(config: &mut web::ServiceConfig, data: MeiliSearch, opt: &Opt) {
pub fn configure_data(
config: &mut web::ServiceConfig,
data: MeiliSearch,
opt: &Opt,
analytics: Arc<dyn Analytics>,
) {
let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize;
config
.app_data(data)
.app_data(web::Data::from(analytics))
.app_data(
web::JsonConfig::default()
.content_type(|mime| mime == mime::APPLICATION_JSON)
@ -168,7 +175,7 @@ pub fn dashboard(config: &mut web::ServiceConfig, _enable_frontend: bool) {
#[macro_export]
macro_rules! create_app {
($data:expr, $enable_frontend:expr, $opt:expr) => {{
($data:expr, $enable_frontend:expr, $opt:expr, $analytics:expr) => {{
use actix_cors::Cors;
use actix_web::middleware::TrailingSlash;
use actix_web::App;
@ -178,7 +185,7 @@ macro_rules! create_app {
use meilisearch_http::{configure_auth, configure_data, dashboard};
App::new()
.configure(|s| configure_data(s, $data.clone(), &$opt))
.configure(|s| configure_data(s, $data.clone(), &$opt, $analytics))
.configure(|s| configure_auth(s, &$opt))
.configure(routes::configure)
.configure(|s| dashboard(s, $enable_frontend))

View File

@ -1,13 +1,13 @@
use std::env;
use std::sync::Arc;
use actix_web::HttpServer;
use meilisearch_http::analytics;
use meilisearch_http::analytics::Analytics;
use meilisearch_http::{create_app, setup_meilisearch, Opt};
use meilisearch_lib::MeiliSearch;
use structopt::StructOpt;
#[cfg(all(not(debug_assertions), feature = "analytics"))]
use meilisearch_http::analytics;
#[cfg(target_os = "linux")]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
@ -47,25 +47,32 @@ async fn main() -> anyhow::Result<()> {
let meilisearch = setup_meilisearch(&opt)?;
#[cfg(all(not(debug_assertions), feature = "analytics"))]
if !opt.no_analytics {
let analytics_data = meilisearch.clone();
let analytics_opt = opt.clone();
tokio::task::spawn(analytics::analytics_sender(analytics_data, analytics_opt));
}
let (analytics, user) = if !opt.no_analytics {
analytics::SegmentAnalytics::new(&opt, &meilisearch).await
} else {
analytics::MockAnalytics::new(&opt)
};
#[cfg(any(debug_assertions, not(feature = "analytics")))]
let (analytics, user) = analytics::MockAnalytics::new(&opt);
print_launch_resume(&opt);
print_launch_resume(&opt, &user);
run_http(meilisearch, opt).await?;
run_http(meilisearch, opt, analytics).await?;
Ok(())
}
async fn run_http(data: MeiliSearch, opt: Opt) -> anyhow::Result<()> {
async fn run_http(
data: MeiliSearch,
opt: Opt,
analytics: Arc<dyn Analytics>,
) -> anyhow::Result<()> {
let _enable_dashboard = &opt.env == "development";
let opt_clone = opt.clone();
let http_server = HttpServer::new(move || create_app!(data, _enable_dashboard, opt_clone))
// Disable signals allows the server to terminate immediately when a user enter CTRL-C
.disable_signals();
let http_server =
HttpServer::new(move || create_app!(data, _enable_dashboard, opt_clone, analytics.clone()))
// Disable signals allows the server to terminate immediately when a user enter CTRL-C
.disable_signals();
if let Some(config) = opt.get_ssl_config()? {
http_server
@ -78,7 +85,7 @@ async fn run_http(data: MeiliSearch, opt: Opt) -> anyhow::Result<()> {
Ok(())
}
pub fn print_launch_resume(opt: &Opt) {
pub fn print_launch_resume(opt: &Opt, user: &str) {
let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
let commit_date = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP").unwrap_or("unknown");
@ -116,11 +123,15 @@ Thank you for using MeiliSearch!
We collect anonymized analytics to improve our product and your experience. To learn more, including how to turn off analytics, visit our dedicated documentation page: https://docs.meilisearch.com/learn/what_is_meilisearch/telemetry.html
Anonymous telemetry: \"Enabled\""
Anonymous telemetry:\t\"Enabled\""
);
}
}
if !user.is_empty() {
eprintln!("Instance UID:\t\t\"{}\"", user);
}
eprintln!();
if opt.master_key.is_some() {

View File

@ -1,8 +1,10 @@
use actix_web::{web, HttpResponse};
use actix_web::{web, HttpRequest, HttpResponse};
use log::debug;
use meilisearch_lib::MeiliSearch;
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::analytics::Analytics;
use crate::error::ResponseError;
use crate::extractors::authentication::{policies::*, GuardedData};
@ -13,7 +15,11 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
pub async fn create_dump(
meilisearch: GuardedData<Private, MeiliSearch>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
let res = meilisearch.create_dump().await?;
debug!("returns: {:?}", res);

View File

@ -11,6 +11,7 @@ use serde::Deserialize;
use serde_json::Value;
use tokio::sync::mpsc;
use crate::analytics::Analytics;
use crate::error::{MeilisearchHttpError, ResponseError};
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::payload::Payload;
@ -122,7 +123,7 @@ pub async fn get_all_documents(
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct UpdateDocumentsQuery {
primary_key: Option<String>,
pub primary_key: Option<String>,
}
pub async fn add_documents(
@ -131,15 +132,26 @@ pub async fn add_documents(
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params);
let content_type = req
.headers()
.get("Content-type")
.map(|s| s.to_str().unwrap_or("unkown"));
let params = params.into_inner();
analytics.add_documents(
&params,
meilisearch.get_index(path.index_uid.clone()).await.is_err(),
&req,
);
document_addition(
req.headers()
.get("Content-type")
.map(|s| s.to_str().unwrap_or("unkown")),
content_type,
meilisearch,
path.into_inner().index_uid,
params.into_inner().primary_key,
path.index_uid.clone(),
params.primary_key,
body,
IndexDocumentsMethod::ReplaceDocuments,
)
@ -152,12 +164,22 @@ pub async fn update_documents(
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params);
let content_type = req
.headers()
.get("Content-type")
.map(|s| s.to_str().unwrap_or("unkown"));
analytics.update_documents(
&params,
meilisearch.get_index(path.index_uid.clone()).await.is_err(),
&req,
);
document_addition(
req.headers()
.get("Content-type")
.map(|s| s.to_str().unwrap_or("unkown")),
content_type,
meilisearch,
path.into_inner().index_uid,
params.into_inner().primary_key,

View File

@ -1,10 +1,12 @@
use actix_web::{web, HttpResponse};
use actix_web::{web, HttpRequest, HttpResponse};
use chrono::{DateTime, Utc};
use log::debug;
use meilisearch_lib::index_controller::IndexSettings;
use meilisearch_lib::MeiliSearch;
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::analytics::Analytics;
use crate::error::ResponseError;
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::routes::IndexParam;
@ -54,8 +56,16 @@ pub struct IndexCreateRequest {
pub async fn create_index(
meilisearch: GuardedData<Private, MeiliSearch>,
body: web::Json<IndexCreateRequest>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let body = body.into_inner();
analytics.publish(
"Index Created".to_string(),
json!({ "primary_key": body.primary_key}),
Some(&req),
);
let meta = meilisearch.create_index(body.uid, body.primary_key).await?;
Ok(HttpResponse::Created().json(meta))
}
@ -90,9 +100,16 @@ pub async fn update_index(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
body: web::Json<UpdateIndexRequest>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", body);
let body = body.into_inner();
analytics.publish(
"Index Updated".to_string(),
json!({ "primary_key": body.primary_key}),
Some(&req),
);
let settings = IndexSettings {
uid: body.uid,
primary_key: body.primary_key,

View File

@ -1,10 +1,11 @@
use actix_web::{web, HttpResponse};
use actix_web::{web, HttpRequest, HttpResponse};
use log::debug;
use meilisearch_lib::index::{default_crop_length, SearchQuery, DEFAULT_SEARCH_LIMIT};
use meilisearch_lib::MeiliSearch;
use serde::Deserialize;
use serde_json::Value;
use crate::analytics::{Analytics, SearchAggregator};
use crate::error::ResponseError;
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::routes::IndexParam;
@ -109,9 +110,14 @@ pub async fn search_with_url_query(
meilisearch: GuardedData<Public, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<SearchQueryGet>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params);
let query = params.into_inner().into();
let query: SearchQuery = params.into_inner().into();
let mut aggregate = SearchAggregator::from_query(&query, &req);
let search_result = meilisearch
.search(path.into_inner().index_uid, query)
.await?;
@ -120,6 +126,9 @@ pub async fn search_with_url_query(
#[cfg(test)]
assert!(!search_result.exhaustive_nb_hits);
aggregate.finish(&search_result);
analytics.get_search(aggregate);
debug!("returns: {:?}", search_result);
Ok(HttpResponse::Ok().json(search_result))
}
@ -128,16 +137,25 @@ pub async fn search_with_post(
meilisearch: GuardedData<Public, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Json<SearchQuery>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!("search called with params: {:?}", params);
let query = params.into_inner();
debug!("search called with params: {:?}", query);
let mut aggregate = SearchAggregator::from_query(&query, &req);
let search_result = meilisearch
.search(path.into_inner().index_uid, params.into_inner())
.search(path.into_inner().index_uid, query)
.await?;
// Tests that the nb_hits is always set to false
#[cfg(test)]
assert!(!search_result.exhaustive_nb_hits);
aggregate.finish(&search_result);
analytics.post_search(aggregate);
debug!("returns: {:?}", search_result);
Ok(HttpResponse::Ok().json(search_result))
}

View File

@ -1,23 +1,26 @@
use log::debug;
use actix_web::{web, HttpResponse};
use actix_web::{web, HttpRequest, HttpResponse};
use meilisearch_lib::index::{Settings, Unchecked};
use meilisearch_lib::index_controller::Update;
use meilisearch_lib::MeiliSearch;
use serde_json::json;
use crate::analytics::Analytics;
use crate::error::ResponseError;
use crate::extractors::authentication::{policies::*, GuardedData};
#[macro_export]
macro_rules! make_setting_route {
($route:literal, $type:ty, $attr:ident, $camelcase_attr:literal) => {
($route:literal, $type:ty, $attr:ident, $camelcase_attr:literal, $analytics_var:ident, $analytics:expr) => {
pub mod $attr {
use log::debug;
use actix_web::{web, HttpResponse, Resource};
use actix_web::{web, HttpResponse, HttpRequest, Resource};
use meilisearch_lib::milli::update::Setting;
use meilisearch_lib::{MeiliSearch, index::Settings, index_controller::Update};
use crate::analytics::Analytics;
use crate::error::ResponseError;
use crate::extractors::authentication::{GuardedData, policies::*};
@ -39,9 +42,15 @@ macro_rules! make_setting_route {
meilisearch: GuardedData<Private, MeiliSearch>,
index_uid: actix_web::web::Path<String>,
body: actix_web::web::Json<Option<$type>>,
req: HttpRequest,
$analytics_var: web::Data< dyn Analytics>,
) -> std::result::Result<HttpResponse, ResponseError> {
let body = body.into_inner();
$analytics(&body, &req);
let settings = Settings {
$attr: match body.into_inner() {
$attr: match body {
Some(inner_body) => Setting::Set(inner_body),
None => Setting::Reset
},
@ -73,20 +82,53 @@ macro_rules! make_setting_route {
}
}
};
($route:literal, $type:ty, $attr:ident, $camelcase_attr:literal) => {
make_setting_route!($route, $type, $attr, $camelcase_attr, _analytics, |_, _| {});
};
}
make_setting_route!(
"/filterable-attributes",
std::collections::BTreeSet<String>,
filterable_attributes,
"filterableAttributes"
"filterableAttributes",
analytics,
|setting: &Option<std::collections::BTreeSet<String>>, req: &HttpRequest| {
use serde_json::json;
analytics.publish(
"FilterableAttributes Updated".to_string(),
json!({
"filterable_attributes": {
"total": setting.as_ref().map(|filter| filter.len()).unwrap_or(0),
"has_geo": setting.as_ref().map(|filter| filter.contains("_geo")).unwrap_or(false),
}
}),
Some(req),
);
}
);
make_setting_route!(
"/sortable-attributes",
std::collections::BTreeSet<String>,
sortable_attributes,
"sortableAttributes"
"sortableAttributes",
analytics,
|setting: &Option<std::collections::BTreeSet<String>>, req: &HttpRequest| {
use serde_json::json;
analytics.publish(
"SortableAttributes Updated".to_string(),
json!({
"sortable_attributes": {
"total": setting.as_ref().map(|sort| sort.len()).unwrap_or(0),
"has_geo": setting.as_ref().map(|sort| sort.contains("_geo")).unwrap_or(false),
},
}),
Some(req),
);
}
);
make_setting_route!(
@ -100,7 +142,21 @@ make_setting_route!(
"/searchable-attributes",
Vec<String>,
searchable_attributes,
"searchableAttributes"
"searchableAttributes",
analytics,
|setting: &Option<Vec<String>>, req: &HttpRequest| {
use serde_json::json;
analytics.publish(
"SearchableAttributes Updated".to_string(),
json!({
"searchable_attributes": {
"total": setting.as_ref().map(|sort| sort.len()).unwrap_or(0),
},
}),
Some(req),
);
}
);
make_setting_route!(
@ -124,7 +180,26 @@ make_setting_route!(
"distinctAttribute"
);
make_setting_route!("/ranking-rules", Vec<String>, ranking_rules, "rankingRules");
make_setting_route!(
"/ranking-rules",
Vec<String>,
ranking_rules,
"rankingRules",
analytics,
|setting: &Option<Vec<String>>, req: &HttpRequest| {
use serde_json::json;
analytics.publish(
"RankingRules Updated".to_string(),
json!({
"ranking_rules": {
"sort_position": setting.as_ref().map(|sort| sort.iter().position(|s| s == "sort")),
}
}),
Some(req),
);
}
);
macro_rules! generate_configure {
($($mod:ident),*) => {
@ -154,9 +229,29 @@ pub async fn update_all(
meilisearch: GuardedData<Private, MeiliSearch>,
index_uid: web::Path<String>,
body: web::Json<Settings<Unchecked>>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let settings = body.into_inner();
analytics.publish(
"Settings Updated".to_string(),
json!({
"ranking_rules": {
"sort_position": settings.ranking_rules.as_ref().set().map(|sort| sort.iter().position(|s| s == "sort")),
},
"sortable_attributes": {
"total": settings.sortable_attributes.as_ref().set().map(|sort| sort.len()).unwrap_or(0),
"has_geo": settings.sortable_attributes.as_ref().set().map(|sort| sort.iter().any(|s| s == "_geo")).unwrap_or(false),
},
"filterable_attributes": {
"total": settings.filterable_attributes.as_ref().set().map(|filter| filter.len()).unwrap_or(0),
"has_geo": settings.filterable_attributes.as_ref().set().map(|filter| filter.iter().any(|s| s == "_geo")).unwrap_or(false),
},
}),
Some(&req),
);
let update = Update::Settings(settings);
let update_result = meilisearch
.register_update(index_uid.into_inner(), update, true)

View File

@ -13,13 +13,6 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::resource("{update_id}").route(web::get().to(get_update_status)));
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct UpdateIndexRequest {
uid: Option<String>,
primary_key: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateIndexResponse {

View File

@ -14,7 +14,7 @@ use crate::extractors::authentication::{policies::*, GuardedData};
use crate::ApiKeys;
mod dump;
mod indexes;
pub mod indexes;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("/health").route(web::get().to(get_health)))
@ -302,6 +302,7 @@ mod test {
impl_is_policy! {A B}
impl_is_policy! {A B C}
impl_is_policy! {A B C D}
impl_is_policy! {A B C D E}
/// Emits a compile error if a route doesn't have the correct authentication policy.
///

View File

@ -2,7 +2,7 @@ use actix_web::{http::StatusCode, test};
use meilisearch_lib::MeiliSearch;
use serde_json::Value;
use meilisearch_http::{create_app, Opt};
use meilisearch_http::{analytics, create_app, Opt};
pub struct Service {
pub meilisearch: MeiliSearch,
@ -11,7 +11,13 @@ pub struct Service {
impl Service {
pub async fn post(&self, url: impl AsRef<str>, body: Value) -> (Value, StatusCode) {
let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await;
let app = test::init_service(create_app!(
&self.meilisearch,
true,
&self.options,
analytics::MockAnalytics::new(&self.options).0
))
.await;
let req = test::TestRequest::post()
.uri(url.as_ref())
@ -31,7 +37,13 @@ impl Service {
url: impl AsRef<str>,
body: impl AsRef<str>,
) -> (Value, StatusCode) {
let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await;
let app = test::init_service(create_app!(
&self.meilisearch,
true,
&self.options,
analytics::MockAnalytics::new(&self.options).0
))
.await;
let req = test::TestRequest::post()
.uri(url.as_ref())
@ -47,7 +59,13 @@ impl Service {
}
pub async fn get(&self, url: impl AsRef<str>) -> (Value, StatusCode) {
let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await;
let app = test::init_service(create_app!(
&self.meilisearch,
true,
&self.options,
analytics::MockAnalytics::new(&self.options).0
))
.await;
let req = test::TestRequest::get().uri(url.as_ref()).to_request();
let res = test::call_service(&app, req).await;
@ -59,7 +77,13 @@ impl Service {
}
pub async fn put(&self, url: impl AsRef<str>, body: Value) -> (Value, StatusCode) {
let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await;
let app = test::init_service(create_app!(
&self.meilisearch,
true,
&self.options,
analytics::MockAnalytics::new(&self.options).0
))
.await;
let req = test::TestRequest::put()
.uri(url.as_ref())
@ -74,7 +98,13 @@ impl Service {
}
pub async fn delete(&self, url: impl AsRef<str>) -> (Value, StatusCode) {
let app = test::init_service(create_app!(&self.meilisearch, true, &self.options)).await;
let app = test::init_service(create_app!(
&self.meilisearch,
true,
&self.options,
analytics::MockAnalytics::new(&self.options).0
))
.await;
let req = test::TestRequest::delete().uri(url.as_ref()).to_request();
let res = test::call_service(&app, req).await;

View File

@ -4,7 +4,7 @@ mod common;
use crate::common::Server;
use actix_web::test;
use meilisearch_http::create_app;
use meilisearch_http::{analytics, create_app};
use serde_json::{json, Value};
#[actix_rt::test]
@ -40,7 +40,8 @@ async fn error_json_bad_content_type() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
for route in routes {

View File

@ -1,7 +1,7 @@
use crate::common::{GetAllDocumentsOptions, Server};
use actix_web::test;
use chrono::DateTime;
use meilisearch_http::create_app;
use meilisearch_http::{analytics, create_app};
use serde_json::{json, Value};
/// This is the basic usage of our API and every other tests uses the content-type application/json
@ -19,7 +19,8 @@ async fn add_documents_test_json_content_types() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
// post
@ -63,7 +64,8 @@ async fn error_add_documents_test_bad_content_types() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
// post
@ -129,7 +131,8 @@ async fn error_add_documents_test_no_content_type() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
// post
@ -187,7 +190,8 @@ async fn error_add_malformed_csv_documents() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
// post
@ -247,7 +251,8 @@ async fn error_add_malformed_json_documents() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
// post
@ -307,7 +312,8 @@ async fn error_add_malformed_ndjson_documents() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
// post
@ -367,7 +373,8 @@ async fn error_add_missing_payload_csv_documents() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
// post
@ -417,7 +424,8 @@ async fn error_add_missing_payload_json_documents() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
// post
@ -467,7 +475,8 @@ async fn error_add_missing_payload_ndjson_documents() {
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
&server.service.options,
analytics::MockAnalytics::new(&server.service.options).0
))
.await;
// post

View File

@ -0,0 +1,8 @@
use std::{fs, path::Path};
/// Copy the `instance-uid` contained in one db to another. Ignore all errors.
pub fn copy_user_id(src: &Path, dst: &Path) {
if let Ok(user_id) = fs::read_to_string(src.join("instance-uid")) {
let _ = fs::write(dst.join("instance-uid"), &user_id);
}
}

View File

@ -22,6 +22,7 @@ pub struct DumpActor<U, I> {
index_resolver: Arc<IndexResolver<U, I>>,
update: UpdateSender,
dump_path: PathBuf,
analytics_path: PathBuf,
lock: Arc<Mutex<()>>,
dump_infos: Arc<RwLock<HashMap<String, DumpInfo>>>,
update_db_size: usize,
@ -43,6 +44,7 @@ where
index_resolver: Arc<IndexResolver<U, I>>,
update: UpdateSender,
dump_path: impl AsRef<Path>,
analytics_path: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
) -> Self {
@ -53,6 +55,7 @@ where
index_resolver,
update,
dump_path: dump_path.as_ref().into(),
analytics_path: analytics_path.as_ref().into(),
dump_infos,
lock,
index_db_size,
@ -118,7 +121,8 @@ where
ret.send(Ok(info)).expect("Dump actor is dead");
let task = DumpTask {
path: self.dump_path.clone(),
dump_path: self.dump_path.clone(),
db_path: self.analytics_path.clone(),
index_resolver: self.index_resolver.clone(),
update_sender: self.update.clone(),
uid: uid.clone(),

View File

@ -33,6 +33,7 @@ impl DumpActorHandle for DumpActorHandleImpl {
impl DumpActorHandleImpl {
pub fn new(
path: impl AsRef<Path>,
analytics_path: impl AsRef<Path>,
index_resolver: Arc<HardStateIndexResolver>,
update: crate::index_controller::updates::UpdateSender,
index_db_size: usize,
@ -44,6 +45,7 @@ impl DumpActorHandleImpl {
index_resolver,
update,
path,
analytics_path,
index_db_size,
update_db_size,
);

View File

@ -2,6 +2,7 @@ use std::path::Path;
use log::info;
use crate::analytics;
use crate::index_controller::dump_actor::Metadata;
use crate::index_controller::index_resolver::IndexResolver;
use crate::index_controller::update_file_store::UpdateFileStore;
@ -24,6 +25,7 @@ pub fn load_dump(
IndexResolver::load_dump(src.as_ref(), &dst, index_db_size, indexing_options)?;
UpdateFileStore::load_dump(src.as_ref(), &dst)?;
UpdateStore::load_dump(&src, &dst, update_db_size)?;
analytics::copy_user_id(src.as_ref(), dst.as_ref());
info!("Loading indexes.");

View File

@ -17,6 +17,7 @@ use super::index_resolver::index_store::IndexStore;
use super::index_resolver::uuid_store::UuidStore;
use super::index_resolver::IndexResolver;
use super::updates::UpdateSender;
use crate::analytics;
use crate::compression::{from_tar_gz, to_tar_gz};
use crate::index_controller::dump_actor::error::DumpActorError;
use crate::index_controller::dump_actor::loaders::{v2, v3};
@ -222,7 +223,8 @@ pub fn load_dump(
}
struct DumpTask<U, I> {
path: PathBuf,
dump_path: PathBuf,
db_path: PathBuf,
index_resolver: Arc<IndexResolver<U, I>>,
update_sender: UpdateSender,
uid: String,
@ -238,7 +240,7 @@ where
async fn run(self) -> Result<()> {
trace!("Performing dump.");
create_dir_all(&self.path).await?;
create_dir_all(&self.dump_path).await?;
let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
@ -247,6 +249,7 @@ where
let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
analytics::copy_user_id(&self.db_path, &temp_dump_path);
create_dir_all(&temp_dump_path.join("indexes")).await?;
let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?;
@ -254,11 +257,11 @@ where
UpdateMsg::dump(&self.update_sender, uuids, temp_dump_path.clone()).await?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?;
to_tar_gz(temp_dump_path, temp_dump_file.path())
.map_err(|e| DumpActorError::Internal(e.into()))?;
let dump_path = self.path.join(self.uid).with_extension("dump");
let dump_path = self.dump_path.join(self.uid).with_extension("dump");
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
@ -338,7 +341,9 @@ mod test {
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
let task = DumpTask {
path: tmp.path().to_owned(),
dump_path: tmp.path().into(),
// this should do nothing
db_path: tmp.path().into(),
index_resolver,
update_sender,
uid: String::from("test"),
@ -366,7 +371,9 @@ mod test {
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
let task = DumpTask {
path: tmp.path().to_owned(),
dump_path: tmp.path().into(),
// this should do nothing
db_path: tmp.path().into(),
index_resolver,
update_sender,
uid: String::from("test"),

View File

@ -169,8 +169,10 @@ impl IndexControllerBuilder {
let dump_path = self
.dump_dst
.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?;
let analytics_path = db_path.as_ref().join("instance-uid");
let dump_handle = dump_actor::DumpActorHandleImpl::new(
dump_path,
analytics_path,
index_resolver.clone(),
update_sender.clone(),
index_size,
@ -187,6 +189,7 @@ impl IndexControllerBuilder {
.ok_or_else(|| anyhow::anyhow!("Snapshot interval not provided."))?,
self.snapshot_dir
.ok_or_else(|| anyhow::anyhow!("Snapshot path not provided."))?,
db_path.as_ref().into(),
db_path
.as_ref()
.file_name()

View File

@ -8,6 +8,7 @@ use tokio::fs;
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use crate::analytics;
use crate::compression::from_tar_gz;
use crate::index_controller::updates::UpdateMsg;
@ -21,6 +22,7 @@ pub struct SnapshotService<U, I> {
update_sender: UpdateSender,
snapshot_period: Duration,
snapshot_path: PathBuf,
db_path: PathBuf,
db_name: String,
}
@ -34,6 +36,7 @@ where
update_sender: UpdateSender,
snapshot_period: Duration,
snapshot_path: PathBuf,
db_path: PathBuf,
db_name: String,
) -> Self {
Self {
@ -41,6 +44,7 @@ where
update_sender,
snapshot_period,
snapshot_path,
db_path,
db_name,
}
}
@ -71,6 +75,8 @@ where
.snapshot(temp_snapshot_path.clone())
.await?;
analytics::copy_user_id(&self.db_path, &temp_snapshot_path.clone());
if indexes.is_empty() {
return Ok(());
}
@ -211,6 +217,8 @@ mod test {
update_sender,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
// this should do nothing
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
@ -243,6 +251,8 @@ mod test {
update_sender,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
// this should do nothing
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
@ -292,6 +302,8 @@ mod test {
update_sender,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
// this should do nothing
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);

View File

@ -5,6 +5,8 @@ pub mod options;
pub mod index;
pub mod index_controller;
mod analytics;
pub use index_controller::updates::store::Update;
pub use index_controller::MeiliSearch;