mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 04:17:10 +02:00
Merge branch 'master' into issue943
This commit is contained in:
commit
f4d918d22a
45 changed files with 2490 additions and 777 deletions
416
meilisearch-http/src/backup.rs
Normal file
416
meilisearch-http/src/backup.rs
Normal file
|
@ -0,0 +1,416 @@
|
|||
use std::fs::{create_dir_all, File};
|
||||
use std::io::prelude::*;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Mutex;
|
||||
use std::thread;
|
||||
|
||||
use actix_web::web;
|
||||
use chrono::offset::Utc;
|
||||
use indexmap::IndexMap;
|
||||
use log::error;
|
||||
use meilisearch_core::{MainWriter, MainReader, UpdateReader};
|
||||
use meilisearch_core::settings::Settings;
|
||||
use meilisearch_core::update::{apply_settings_update, apply_documents_addition};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::Data;
|
||||
use crate::error::Error;
|
||||
use crate::helpers::compression;
|
||||
use crate::routes::index;
|
||||
use crate::routes::index::IndexResponse;
|
||||
|
||||
// Mutex to share backup progress.
|
||||
static BACKUP_INFO: Lazy<Mutex<Option<BackupInfo>>> = Lazy::new(Mutex::default);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
enum BackupVersion {
|
||||
V1,
|
||||
}
|
||||
|
||||
impl BackupVersion {
|
||||
const CURRENT: Self = Self::V1;
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct BackupMetadata {
|
||||
indexes: Vec<crate::routes::index::IndexResponse>,
|
||||
db_version: String,
|
||||
backup_version: BackupVersion,
|
||||
}
|
||||
|
||||
impl BackupMetadata {
|
||||
/// Create a BackupMetadata with the current backup version of meilisearch.
|
||||
pub fn new(indexes: Vec<crate::routes::index::IndexResponse>, db_version: String) -> Self {
|
||||
BackupMetadata {
|
||||
indexes,
|
||||
db_version,
|
||||
backup_version: BackupVersion::CURRENT,
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract BackupMetadata from `metadata.json` file present at provided `folder_path`
|
||||
fn from_path(folder_path: &Path) -> Result<Self, Error> {
|
||||
let path = folder_path.join("metadata.json");
|
||||
let file = File::open(path)?;
|
||||
let reader = std::io::BufReader::new(file);
|
||||
let metadata = serde_json::from_reader(reader)?;
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
/// Write BackupMetadata in `metadata.json` file at provided `folder_path`
|
||||
fn to_path(&self, folder_path: &Path) -> Result<(), Error> {
|
||||
let path = folder_path.join("metadata.json");
|
||||
let file = File::create(path)?;
|
||||
|
||||
serde_json::to_writer(file, &self)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract Settings from `settings.json` file present at provided `folder_path`
|
||||
fn settings_from_path(folder_path: &Path) -> Result<Settings, Error> {
|
||||
let path = folder_path.join("settings.json");
|
||||
let file = File::open(path)?;
|
||||
let reader = std::io::BufReader::new(file);
|
||||
let metadata = serde_json::from_reader(reader)?;
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
/// Write Settings in `settings.json` file at provided `folder_path`
|
||||
fn settings_to_path(settings: &Settings, folder_path: &Path) -> Result<(), Error> {
|
||||
let path = folder_path.join("settings.json");
|
||||
let file = File::create(path)?;
|
||||
|
||||
serde_json::to_writer(file, settings)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Import settings and documents of a backup with version `BackupVersion::V1` in specified index.
|
||||
fn import_index_v1(
|
||||
data: &Data,
|
||||
backup_folder: &Path,
|
||||
index_uid: &str,
|
||||
document_batch_size: usize,
|
||||
write_txn: &mut MainWriter,
|
||||
) -> Result<(), Error> {
|
||||
|
||||
// open index
|
||||
let index = data
|
||||
.db
|
||||
.open_index(index_uid)
|
||||
.ok_or(Error::index_not_found(index_uid))?;
|
||||
|
||||
// index folder path in backup folder
|
||||
let index_path = &backup_folder.join(index_uid);
|
||||
|
||||
// extract `settings.json` file and import content
|
||||
let settings = settings_from_path(&index_path)?;
|
||||
let settings = settings.to_update().or_else(|_e| Err(Error::backup_failed()))?;
|
||||
apply_settings_update(write_txn, &index, settings)?;
|
||||
|
||||
// create iterator over documents in `documents.jsonl` to make batch importation
|
||||
// create iterator over documents in `documents.jsonl` to make batch importation
|
||||
let documents = {
|
||||
let file = File::open(&index_path.join("documents.jsonl"))?;
|
||||
let reader = std::io::BufReader::new(file);
|
||||
let deserializer = serde_json::Deserializer::from_reader(reader);
|
||||
deserializer.into_iter::<IndexMap<String, serde_json::Value>>()
|
||||
};
|
||||
|
||||
// batch import document every `document_batch_size`:
|
||||
// create a Vec to bufferize documents
|
||||
let mut values = Vec::with_capacity(document_batch_size);
|
||||
// iterate over documents
|
||||
for document in documents {
|
||||
// push document in buffer
|
||||
values.push(document?);
|
||||
// if buffer is full, create and apply a batch, and clean buffer
|
||||
if values.len() == document_batch_size {
|
||||
let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size));
|
||||
apply_documents_addition(write_txn, &index, batch)?;
|
||||
}
|
||||
}
|
||||
|
||||
// apply documents remaining in the buffer
|
||||
if !values.is_empty() {
|
||||
apply_documents_addition(write_txn, &index, values)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Import backup from `backup_folder` in database.
|
||||
pub fn import_backup(
|
||||
data: &Data,
|
||||
backup_folder: &Path,
|
||||
document_batch_size: usize,
|
||||
) -> Result<(), Error> {
|
||||
// create a temporary directory
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let tmp_dir_path = tmp_dir.path();
|
||||
|
||||
// extract backup in temporary directory
|
||||
compression::from_tar_gz(backup_folder, tmp_dir_path)?;
|
||||
|
||||
// read backup metadata
|
||||
let metadata = BackupMetadata::from_path(&tmp_dir_path)?;
|
||||
|
||||
// choose importation function from BackupVersion of metadata
|
||||
let import_index = match metadata.backup_version {
|
||||
BackupVersion::V1 => import_index_v1,
|
||||
};
|
||||
|
||||
// remove indexes which have same `uid` than indexes to import and create empty indexes
|
||||
let existing_index_uids = data.db.indexes_uids();
|
||||
for index in metadata.indexes.iter() {
|
||||
if existing_index_uids.contains(&index.uid) {
|
||||
data.db.delete_index(index.uid.clone())?;
|
||||
}
|
||||
index::create_index_sync(&data.db, index.uid.clone(), index.name.clone(), index.primary_key.clone())?;
|
||||
}
|
||||
|
||||
// import each indexes content
|
||||
data.db.main_write::<_, _, Error>(|mut writer| {
|
||||
for index in metadata.indexes {
|
||||
import_index(&data, tmp_dir_path, &index.uid, document_batch_size, &mut writer)?;
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum BackupStatus {
|
||||
Done,
|
||||
Processing,
|
||||
BackupProcessFailed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct BackupInfo {
|
||||
pub uid: String,
|
||||
pub status: BackupStatus,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
impl BackupInfo {
|
||||
pub fn new(uid: String, status: BackupStatus) -> Self {
|
||||
Self { uid, status, error: None }
|
||||
}
|
||||
|
||||
pub fn with_error(mut self, error: String) -> Self {
|
||||
self.status = BackupStatus::BackupProcessFailed;
|
||||
self.error = Some(error);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn backup_already_in_progress(&self) -> bool {
|
||||
self.status == BackupStatus::Processing
|
||||
}
|
||||
|
||||
pub fn get_current() -> Option<Self> {
|
||||
BACKUP_INFO.lock().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn set_current(&self) {
|
||||
*BACKUP_INFO.lock().unwrap() = Some(self.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate uid from creation date
|
||||
fn generate_uid() -> String {
|
||||
Utc::now().format("%Y%m%d-%H%M%S").to_string()
|
||||
}
|
||||
|
||||
/// Infer backup_folder from backup_uid
|
||||
pub fn compressed_backup_folder(backup_folder: &Path, backup_uid: &str) -> PathBuf {
|
||||
backup_folder.join(format!("{}.tar.gz", backup_uid))
|
||||
}
|
||||
|
||||
/// Write metadata in backup
|
||||
fn backup_metadata(data: &web::Data<Data>, folder_path: &Path, indexes: Vec<IndexResponse>) -> Result<(), Error> {
|
||||
let (db_major, db_minor, db_patch) = data.db.version();
|
||||
let metadata = BackupMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch));
|
||||
|
||||
metadata.to_path(folder_path)
|
||||
}
|
||||
|
||||
/// Export settings of provided index in backup
|
||||
fn backup_index_settings(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||
let settings = crate::routes::setting::get_all_sync(data, reader, index_uid)?;
|
||||
|
||||
settings_to_path(&settings, folder_path)
|
||||
}
|
||||
|
||||
/// Export updates of provided index in backup
|
||||
fn backup_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||
let updates_path = folder_path.join("updates.jsonl");
|
||||
let updates = crate::routes::index::get_all_updates_status_sync(data, reader, index_uid)?;
|
||||
|
||||
let file = File::create(updates_path)?;
|
||||
|
||||
for update in updates {
|
||||
serde_json::to_writer(&file, &update)?;
|
||||
writeln!(&file)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Export documents of provided index in backup
|
||||
fn backup_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||
let documents_path = folder_path.join("documents.jsonl");
|
||||
let file = File::create(documents_path)?;
|
||||
let backup_batch_size = data.backup_batch_size;
|
||||
|
||||
let mut offset = 0;
|
||||
loop {
|
||||
let documents = crate::routes::document::get_all_documents_sync(data, reader, index_uid, offset, backup_batch_size, None)?;
|
||||
if documents.len() == 0 { break; } else { offset += backup_batch_size; }
|
||||
|
||||
for document in documents {
|
||||
serde_json::to_writer(&file, &document)?;
|
||||
writeln!(&file)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write error with a context.
|
||||
fn fail_backup_process<E: std::error::Error>(backup_info: BackupInfo, context: &str, error: E) {
|
||||
let error = format!("Something went wrong during backup process: {}; {}", context, error);
|
||||
|
||||
error!("{}", &error);
|
||||
backup_info.with_error(error).set_current();
|
||||
}
|
||||
|
||||
/// Main function of backup.
|
||||
fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: BackupInfo) {
|
||||
// open read transaction on Update
|
||||
let update_reader = match data.db.update_read_txn() {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
fail_backup_process(backup_info, "creating RO transaction on updates", e);
|
||||
return ;
|
||||
}
|
||||
};
|
||||
|
||||
// open read transaction on Main
|
||||
let main_reader = match data.db.main_read_txn() {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
fail_backup_process(backup_info, "creating RO transaction on main", e);
|
||||
return ;
|
||||
}
|
||||
};
|
||||
|
||||
// create a temporary directory
|
||||
let tmp_dir = match TempDir::new() {
|
||||
Ok(tmp_dir) => tmp_dir,
|
||||
Err(e) => {
|
||||
fail_backup_process(backup_info, "creating temporary directory", e);
|
||||
return ;
|
||||
}
|
||||
};
|
||||
let tmp_dir_path = tmp_dir.path();
|
||||
|
||||
// fetch indexes
|
||||
let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) {
|
||||
Ok(indexes) => indexes,
|
||||
Err(e) => {
|
||||
fail_backup_process(backup_info, "listing indexes", e);
|
||||
return ;
|
||||
}
|
||||
};
|
||||
|
||||
// create metadata
|
||||
if let Err(e) = backup_metadata(&data, &tmp_dir_path, indexes.clone()) {
|
||||
fail_backup_process(backup_info, "generating metadata", e);
|
||||
return ;
|
||||
}
|
||||
|
||||
// export settings, updates and documents for each indexes
|
||||
for index in indexes {
|
||||
let index_path = tmp_dir_path.join(&index.uid);
|
||||
|
||||
// create index sub-dircetory
|
||||
if let Err(e) = create_dir_all(&index_path) {
|
||||
fail_backup_process(backup_info, &format!("creating directory for index {}", &index.uid), e);
|
||||
return ;
|
||||
}
|
||||
|
||||
// export settings
|
||||
if let Err(e) = backup_index_settings(&data, &main_reader, &index_path, &index.uid) {
|
||||
fail_backup_process(backup_info, &format!("generating settings for index {}", &index.uid), e);
|
||||
return ;
|
||||
}
|
||||
|
||||
// export documents
|
||||
if let Err(e) = backup_index_documents(&data, &main_reader, &index_path, &index.uid) {
|
||||
fail_backup_process(backup_info, &format!("generating documents for index {}", &index.uid), e);
|
||||
return ;
|
||||
}
|
||||
|
||||
// export updates
|
||||
if let Err(e) = backup_index_updates(&data, &update_reader, &index_path, &index.uid) {
|
||||
fail_backup_process(backup_info, &format!("generating updates for index {}", &index.uid), e);
|
||||
return ;
|
||||
}
|
||||
}
|
||||
|
||||
// compress backup in a file named `{backup_uid}.tar.gz` in `backup_folder`
|
||||
if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_backup_folder(&backup_folder, &backup_info.uid)) {
|
||||
fail_backup_process(backup_info, "compressing backup", e);
|
||||
return ;
|
||||
}
|
||||
|
||||
// update backup info to `done`
|
||||
let resume = BackupInfo::new(
|
||||
backup_info.uid,
|
||||
BackupStatus::Done
|
||||
);
|
||||
|
||||
resume.set_current();
|
||||
}
|
||||
|
||||
pub fn init_backup_process(data: &web::Data<Data>, backup_folder: &Path) -> Result<BackupInfo, Error> {
|
||||
create_dir_all(backup_folder).or(Err(Error::backup_failed()))?;
|
||||
|
||||
// check if a backup is already in progress
|
||||
if let Some(resume) = BackupInfo::get_current() {
|
||||
if resume.backup_already_in_progress() {
|
||||
return Err(Error::backup_conflict())
|
||||
}
|
||||
}
|
||||
|
||||
// generate a new backup info
|
||||
let info = BackupInfo::new(
|
||||
generate_uid(),
|
||||
BackupStatus::Processing
|
||||
);
|
||||
|
||||
info.set_current();
|
||||
|
||||
let data = data.clone();
|
||||
let backup_folder = backup_folder.to_path_buf();
|
||||
let info_cloned = info.clone();
|
||||
// run backup process in a new thread
|
||||
thread::spawn(move ||
|
||||
backup_process(data, backup_folder, info_cloned)
|
||||
);
|
||||
|
||||
Ok(info)
|
||||
}
|
|
@ -1,5 +1,6 @@
|
|||
use std::error::Error;
|
||||
use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use meilisearch_core::{Database, DatabaseOptions};
|
||||
|
@ -25,6 +26,8 @@ impl Deref for Data {
|
|||
pub struct DataInner {
|
||||
pub db: Arc<Database>,
|
||||
pub db_path: String,
|
||||
pub backup_folder: PathBuf,
|
||||
pub backup_batch_size: usize,
|
||||
pub api_keys: ApiKeys,
|
||||
pub server_pid: u32,
|
||||
pub http_payload_size_limit: usize,
|
||||
|
@ -57,6 +60,8 @@ impl ApiKeys {
|
|||
impl Data {
|
||||
pub fn new(opt: Opt) -> Result<Data, Box<dyn Error>> {
|
||||
let db_path = opt.db_path.clone();
|
||||
let backup_folder = opt.backup_folder.clone();
|
||||
let backup_batch_size = opt.backup_batch_size;
|
||||
let server_pid = std::process::id();
|
||||
|
||||
let db_opt = DatabaseOptions {
|
||||
|
@ -79,6 +84,8 @@ impl Data {
|
|||
let inner_data = DataInner {
|
||||
db: db.clone(),
|
||||
db_path,
|
||||
backup_folder,
|
||||
backup_batch_size,
|
||||
api_keys,
|
||||
server_pid,
|
||||
http_payload_size_limit,
|
||||
|
|
|
@ -41,6 +41,7 @@ pub enum Error {
|
|||
CreateIndex(String),
|
||||
DocumentNotFound(String),
|
||||
IndexNotFound(String),
|
||||
IndexAlreadyExists(String),
|
||||
Internal(String),
|
||||
InvalidIndexUid,
|
||||
InvalidToken(String),
|
||||
|
@ -52,6 +53,8 @@ pub enum Error {
|
|||
SearchDocuments(String),
|
||||
PayloadTooLarge,
|
||||
UnsupportedMediaType,
|
||||
BackupAlreadyInProgress,
|
||||
BackupProcessFailed,
|
||||
}
|
||||
|
||||
impl error::Error for Error {}
|
||||
|
@ -65,6 +68,7 @@ impl ErrorCode for Error {
|
|||
CreateIndex(_) => Code::CreateIndex,
|
||||
DocumentNotFound(_) => Code::DocumentNotFound,
|
||||
IndexNotFound(_) => Code::IndexNotFound,
|
||||
IndexAlreadyExists(_) => Code::IndexAlreadyExists,
|
||||
Internal(_) => Code::Internal,
|
||||
InvalidIndexUid => Code::InvalidIndexUid,
|
||||
InvalidToken(_) => Code::InvalidToken,
|
||||
|
@ -76,6 +80,8 @@ impl ErrorCode for Error {
|
|||
SearchDocuments(_) => Code::SearchDocuments,
|
||||
PayloadTooLarge => Code::PayloadTooLarge,
|
||||
UnsupportedMediaType => Code::UnsupportedMediaType,
|
||||
BackupAlreadyInProgress => Code::BackupAlreadyInProgress,
|
||||
BackupProcessFailed => Code::BackupProcessFailed,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -178,6 +184,14 @@ impl Error {
|
|||
pub fn search_documents(err: impl fmt::Display) -> Error {
|
||||
Error::SearchDocuments(err.to_string())
|
||||
}
|
||||
|
||||
pub fn backup_conflict() -> Error {
|
||||
Error::BackupAlreadyInProgress
|
||||
}
|
||||
|
||||
pub fn backup_failed() -> Error {
|
||||
Error::BackupProcessFailed
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Error {
|
||||
|
@ -188,6 +202,7 @@ impl fmt::Display for Error {
|
|||
Self::CreateIndex(err) => write!(f, "Impossible to create index; {}", err),
|
||||
Self::DocumentNotFound(document_id) => write!(f, "Document with id {} not found", document_id),
|
||||
Self::IndexNotFound(index_uid) => write!(f, "Index {} not found", index_uid),
|
||||
Self::IndexAlreadyExists(index_uid) => write!(f, "Index {} already exists", index_uid),
|
||||
Self::Internal(err) => f.write_str(err),
|
||||
Self::InvalidIndexUid => f.write_str("Index must have a valid uid; Index uid can be of type integer or string only composed of alphanumeric characters, hyphens (-) and underscores (_)."),
|
||||
Self::InvalidToken(err) => write!(f, "Invalid API key: {}", err),
|
||||
|
@ -199,6 +214,8 @@ impl fmt::Display for Error {
|
|||
Self::SearchDocuments(err) => write!(f, "Impossible to search documents; {}", err),
|
||||
Self::PayloadTooLarge => f.write_str("Payload too large"),
|
||||
Self::UnsupportedMediaType => f.write_str("Unsupported media type"),
|
||||
Self::BackupAlreadyInProgress => f.write_str("Another backup is already in progress"),
|
||||
Self::BackupProcessFailed => f.write_str("Backup process failed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -218,6 +235,12 @@ impl aweb::error::ResponseError for ResponseError {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(err: std::io::Error) -> Error {
|
||||
Error::Internal(err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<meilisearch_core::Error> for ResponseError {
|
||||
fn from(err: meilisearch_core::Error) -> ResponseError {
|
||||
ResponseError { inner: Box::new(err) }
|
||||
|
@ -236,14 +259,14 @@ impl From<actix_http::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(err: std::io::Error) -> Error {
|
||||
impl From<meilisearch_core::Error> for Error {
|
||||
fn from(err: meilisearch_core::Error) -> Error {
|
||||
Error::Internal(err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<meilisearch_core::Error> for Error {
|
||||
fn from(err: meilisearch_core::Error) -> Error {
|
||||
impl From<serde_json::error::Error> for Error {
|
||||
fn from(err: serde_json::error::Error) -> Error {
|
||||
Error::Internal(err.to_string())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::rc::Rc;
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use actix_service::{Service, Transform};
|
||||
use actix_web::{dev::ServiceRequest, dev::ServiceResponse};
|
||||
use actix_web::{dev::ServiceRequest, dev::ServiceResponse, web};
|
||||
use futures::future::{err, ok, Future, Ready};
|
||||
|
||||
use crate::error::{Error, ResponseError};
|
||||
|
@ -63,7 +63,7 @@ where
|
|||
let mut svc = self.service.clone();
|
||||
// This unwrap is left because this error should never appear. If that's the case, then
|
||||
// it means that actix-web has an issue or someone changes the type `Data`.
|
||||
let data = req.app_data::<Data>().unwrap();
|
||||
let data = req.app_data::<web::Data<Data>>().unwrap();
|
||||
|
||||
if data.api_keys.master.is_none() {
|
||||
return Box::pin(svc.call(req));
|
||||
|
|
27
meilisearch-http/src/helpers/compression.rs
Normal file
27
meilisearch-http/src/helpers/compression.rs
Normal file
|
@ -0,0 +1,27 @@
|
|||
use flate2::Compression;
|
||||
use flate2::read::GzDecoder;
|
||||
use flate2::write::GzEncoder;
|
||||
use std::fs::{create_dir_all, File};
|
||||
use std::path::Path;
|
||||
use tar::{Builder, Archive};
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
pub fn to_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> {
|
||||
let f = File::create(dest)?;
|
||||
let gz_encoder = GzEncoder::new(f, Compression::default());
|
||||
let mut tar_encoder = Builder::new(gz_encoder);
|
||||
tar_encoder.append_dir_all(".", src)?;
|
||||
let gz_encoder = tar_encoder.into_inner()?;
|
||||
gz_encoder.finish()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn from_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> {
|
||||
let f = File::open(src)?;
|
||||
let gz = GzDecoder::new(f);
|
||||
let mut ar = Archive::new(gz);
|
||||
create_dir_all(dest)?;
|
||||
ar.unpack(dest)?;
|
||||
Ok(())
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
pub mod authentication;
|
||||
pub mod meilisearch;
|
||||
pub mod normalize_path;
|
||||
pub mod compression;
|
||||
|
||||
pub use authentication::Authentication;
|
||||
pub use normalize_path::NormalizePath;
|
||||
|
|
|
@ -8,6 +8,7 @@ pub mod option;
|
|||
pub mod routes;
|
||||
pub mod analytics;
|
||||
pub mod snapshot;
|
||||
pub mod backup;
|
||||
|
||||
use actix_http::Error;
|
||||
use actix_service::ServiceFactory;
|
||||
|
@ -34,7 +35,7 @@ pub fn create_app(
|
|||
actix_http::body::Body,
|
||||
> {
|
||||
App::new()
|
||||
.app_data(web::Data::new(data.clone()))
|
||||
.data(data.clone())
|
||||
.app_data(
|
||||
web::JsonConfig::default()
|
||||
.limit(data.http_payload_size_limit)
|
||||
|
@ -56,6 +57,7 @@ pub fn create_app(
|
|||
.configure(routes::health::services)
|
||||
.configure(routes::stats::services)
|
||||
.configure(routes::key::services)
|
||||
.configure(routes::backup::services)
|
||||
}
|
||||
|
||||
pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpdateResult) {
|
||||
|
|
|
@ -6,7 +6,7 @@ use main_error::MainError;
|
|||
use meilisearch_http::helpers::NormalizePath;
|
||||
use meilisearch_http::{create_app, index_update_callback, Data, Opt};
|
||||
use structopt::StructOpt;
|
||||
use meilisearch_http::snapshot;
|
||||
use meilisearch_http::{snapshot, backup};
|
||||
|
||||
mod analytics;
|
||||
|
||||
|
@ -14,7 +14,7 @@ mod analytics;
|
|||
#[global_allocator]
|
||||
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
||||
|
||||
#[actix_rt::main]
|
||||
#[actix_web::main]
|
||||
async fn main() -> Result<(), MainError> {
|
||||
let opt = Opt::from_args();
|
||||
|
||||
|
@ -69,6 +69,11 @@ async fn main() -> Result<(), MainError> {
|
|||
index_update_callback(name, &data_cloned, status);
|
||||
}));
|
||||
|
||||
|
||||
if let Some(path) = &opt.import_backup {
|
||||
backup::import_backup(&data, path, opt.backup_batch_size)?;
|
||||
}
|
||||
|
||||
if let Some(path) = &opt.snapshot_path {
|
||||
snapshot::schedule_snapshot(data.clone(), &path, opt.snapshot_interval_sec.unwrap_or(86400))?;
|
||||
}
|
||||
|
|
|
@ -115,6 +115,18 @@ pub struct Opt {
|
|||
/// Defines time interval, in seconds, between each snapshot creation.
|
||||
#[structopt(long, requires = "snapshot-path", env = "MEILI_SNAPSHOT_INTERVAL_SEC")]
|
||||
pub snapshot_interval_sec: Option<u64>,
|
||||
|
||||
/// Folder where backups are created when the backup route is called.
|
||||
#[structopt(long, env = "MEILI_backup_folder", default_value = "backups/")]
|
||||
pub backup_folder: PathBuf,
|
||||
|
||||
/// Import a backup from the specified path, must be a `.tar.gz` file.
|
||||
#[structopt(long, env = "MEILI_IMPORT_BACKUP", conflicts_with = "load-from-snapshot")]
|
||||
pub import_backup: Option<PathBuf>,
|
||||
|
||||
/// The batch size used in the importation process, the bigger it is the faster the backup is created.
|
||||
#[structopt(long, env = "MEILI_BACKUP_BATCH_SIZE", default_value = "1024")]
|
||||
pub backup_batch_size: usize,
|
||||
}
|
||||
|
||||
impl Opt {
|
||||
|
|
64
meilisearch-http/src/routes/backup.rs
Normal file
64
meilisearch-http/src/routes/backup.rs
Normal file
|
@ -0,0 +1,64 @@
|
|||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
|
||||
use actix_web::{get, post};
|
||||
use actix_web::{HttpResponse, web};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::backup::{BackupInfo, BackupStatus, compressed_backup_folder, init_backup_process};
|
||||
use crate::Data;
|
||||
use crate::error::{Error, ResponseError};
|
||||
use crate::helpers::Authentication;
|
||||
|
||||
pub fn services(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(trigger_backup)
|
||||
.service(get_backup_status);
|
||||
}
|
||||
|
||||
#[post("/backups", wrap = "Authentication::Private")]
|
||||
async fn trigger_backup(
|
||||
data: web::Data<Data>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let backup_folder = Path::new(&data.backup_folder);
|
||||
match init_backup_process(&data, &backup_folder) {
|
||||
Ok(resume) => Ok(HttpResponse::Accepted().json(resume)),
|
||||
Err(e) => Err(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct BackupStatusResponse {
|
||||
status: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct BackupParam {
|
||||
backup_uid: String,
|
||||
}
|
||||
|
||||
#[get("/backups/{backup_uid}/status", wrap = "Authentication::Private")]
|
||||
async fn get_backup_status(
|
||||
data: web::Data<Data>,
|
||||
path: web::Path<BackupParam>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let backup_folder = Path::new(&data.backup_folder);
|
||||
let backup_uid = &path.backup_uid;
|
||||
|
||||
if let Some(resume) = BackupInfo::get_current() {
|
||||
if &resume.uid == backup_uid {
|
||||
return Ok(HttpResponse::Ok().json(resume));
|
||||
}
|
||||
}
|
||||
|
||||
if File::open(compressed_backup_folder(Path::new(backup_folder), backup_uid)).is_ok() {
|
||||
let resume = BackupInfo::new(
|
||||
backup_uid.into(),
|
||||
BackupStatus::Done
|
||||
);
|
||||
|
||||
Ok(HttpResponse::Ok().json(resume))
|
||||
} else {
|
||||
Err(Error::not_found("backup does not exist").into())
|
||||
}
|
||||
}
|
|
@ -1,11 +1,11 @@
|
|||
use std::collections::{BTreeSet, HashSet};
|
||||
|
||||
use actix_web::{delete, get, post, put};
|
||||
use actix_web::{web, HttpResponse};
|
||||
use actix_web_macros::{delete, get, post, put};
|
||||
use indexmap::IndexMap;
|
||||
use meilisearch_core::update;
|
||||
use serde::Deserialize;
|
||||
use meilisearch_core::{update, MainReader};
|
||||
use serde_json::Value;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::Data;
|
||||
use crate::error::{Error, ResponseError};
|
||||
|
@ -85,41 +85,61 @@ struct BrowseQuery {
|
|||
attributes_to_retrieve: Option<String>,
|
||||
}
|
||||
|
||||
pub fn get_all_documents_sync(
|
||||
data: &web::Data<Data>,
|
||||
reader: &MainReader,
|
||||
index_uid: &str,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
attributes_to_retrieve: Option<&String>
|
||||
) -> Result<Vec<Document>, Error> {
|
||||
let index = data
|
||||
.db
|
||||
.open_index(index_uid)
|
||||
.ok_or(Error::index_not_found(index_uid))?;
|
||||
|
||||
|
||||
let documents_ids: Result<BTreeSet<_>, _> = index
|
||||
.documents_fields_counts
|
||||
.documents_ids(reader)?
|
||||
.skip(offset)
|
||||
.take(limit)
|
||||
.collect();
|
||||
|
||||
let attributes: Option<HashSet<&str>> = attributes_to_retrieve
|
||||
.map(|a| a.split(',').collect());
|
||||
|
||||
let mut documents = Vec::new();
|
||||
for document_id in documents_ids? {
|
||||
if let Ok(Some(document)) =
|
||||
index.document::<Document>(reader, attributes.as_ref(), document_id)
|
||||
{
|
||||
documents.push(document);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(documents)
|
||||
}
|
||||
|
||||
#[get("/indexes/{index_uid}/documents", wrap = "Authentication::Public")]
|
||||
async fn get_all_documents(
|
||||
data: web::Data<Data>,
|
||||
path: web::Path<IndexParam>,
|
||||
params: web::Query<BrowseQuery>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index = data
|
||||
.db
|
||||
.open_index(&path.index_uid)
|
||||
.ok_or(Error::index_not_found(&path.index_uid))?;
|
||||
|
||||
let offset = params.offset.unwrap_or(0);
|
||||
let limit = params.limit.unwrap_or(20);
|
||||
|
||||
let index_uid = &path.index_uid;
|
||||
let reader = data.db.main_read_txn()?;
|
||||
let documents_ids: Result<BTreeSet<_>, _> = index
|
||||
.documents_fields_counts
|
||||
.documents_ids(&reader)?
|
||||
.skip(offset)
|
||||
.take(limit)
|
||||
.collect();
|
||||
|
||||
let attributes: Option<HashSet<&str>> = params
|
||||
.attributes_to_retrieve
|
||||
.as_ref()
|
||||
.map(|a| a.split(',').collect());
|
||||
|
||||
let mut documents = Vec::new();
|
||||
for document_id in documents_ids? {
|
||||
if let Ok(Some(document)) =
|
||||
index.document::<Document>(&reader, attributes.as_ref(), document_id)
|
||||
{
|
||||
documents.push(document);
|
||||
}
|
||||
}
|
||||
|
||||
let documents = get_all_documents_sync(
|
||||
&data,
|
||||
&reader,
|
||||
index_uid,
|
||||
offset,
|
||||
limit,
|
||||
params.attributes_to_retrieve.as_ref()
|
||||
)?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(documents))
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use actix_web::{web, HttpResponse};
|
||||
use actix_web_macros::{get, put};
|
||||
use actix_web::{get, put};
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::error::{Error, ResponseError};
|
||||
|
|
|
@ -1,14 +1,16 @@
|
|||
use actix_web::{delete, get, post, put};
|
||||
use actix_web::{web, HttpResponse};
|
||||
use actix_web_macros::{delete, get, post, put};
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::error;
|
||||
use meilisearch_core::{Database, MainReader, UpdateReader};
|
||||
use meilisearch_core::update::UpdateStatus;
|
||||
use rand::seq::SliceRandom;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::Data;
|
||||
use crate::error::{Error, ResponseError};
|
||||
use crate::helpers::Authentication;
|
||||
use crate::routes::IndexParam;
|
||||
use crate::Data;
|
||||
|
||||
pub fn services(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(list_indexes)
|
||||
|
@ -29,19 +31,17 @@ fn generate_uid() -> String {
|
|||
.collect()
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct IndexResponse {
|
||||
name: String,
|
||||
uid: String,
|
||||
pub struct IndexResponse {
|
||||
pub name: String,
|
||||
pub uid: String,
|
||||
created_at: DateTime<Utc>,
|
||||
updated_at: DateTime<Utc>,
|
||||
primary_key: Option<String>,
|
||||
pub primary_key: Option<String>,
|
||||
}
|
||||
|
||||
#[get("/indexes", wrap = "Authentication::Private")]
|
||||
async fn list_indexes(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
|
||||
let reader = data.db.main_read_txn()?;
|
||||
pub fn list_indexes_sync(data: &web::Data<Data>, reader: &MainReader) -> Result<Vec<IndexResponse>, ResponseError> {
|
||||
let mut indexes = Vec::new();
|
||||
|
||||
for index_uid in data.db.indexes_uids() {
|
||||
|
@ -49,23 +49,23 @@ async fn list_indexes(data: web::Data<Data>) -> Result<HttpResponse, ResponseErr
|
|||
|
||||
match index {
|
||||
Some(index) => {
|
||||
let name = index.main.name(&reader)?.ok_or(Error::internal(
|
||||
let name = index.main.name(reader)?.ok_or(Error::internal(
|
||||
"Impossible to get the name of an index",
|
||||
))?;
|
||||
let created_at = index
|
||||
.main
|
||||
.created_at(&reader)?
|
||||
.created_at(reader)?
|
||||
.ok_or(Error::internal(
|
||||
"Impossible to get the create date of an index",
|
||||
))?;
|
||||
let updated_at = index
|
||||
.main
|
||||
.updated_at(&reader)?
|
||||
.updated_at(reader)?
|
||||
.ok_or(Error::internal(
|
||||
"Impossible to get the last update date of an index",
|
||||
))?;
|
||||
|
||||
let primary_key = match index.main.schema(&reader) {
|
||||
let primary_key = match index.main.schema(reader) {
|
||||
Ok(Some(schema)) => match schema.primary_key() {
|
||||
Some(primary_key) => Some(primary_key.to_owned()),
|
||||
None => None,
|
||||
|
@ -89,6 +89,14 @@ async fn list_indexes(data: web::Data<Data>) -> Result<HttpResponse, ResponseErr
|
|||
}
|
||||
}
|
||||
|
||||
Ok(indexes)
|
||||
}
|
||||
|
||||
#[get("/indexes", wrap = "Authentication::Private")]
|
||||
async fn list_indexes(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
|
||||
let reader = data.db.main_read_txn()?;
|
||||
let indexes = list_indexes_sync(&data, &reader)?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(indexes))
|
||||
}
|
||||
|
||||
|
@ -145,6 +153,55 @@ struct IndexCreateRequest {
|
|||
primary_key: Option<String>,
|
||||
}
|
||||
|
||||
|
||||
pub fn create_index_sync(
|
||||
database: &std::sync::Arc<Database>,
|
||||
uid: String,
|
||||
name: String,
|
||||
primary_key: Option<String>,
|
||||
) -> Result<IndexResponse, Error> {
|
||||
|
||||
let created_index = database
|
||||
.create_index(&uid)
|
||||
.map_err(|e| match e {
|
||||
meilisearch_core::Error::IndexAlreadyExists => Error::IndexAlreadyExists(uid.clone()),
|
||||
_ => Error::create_index(e)
|
||||
})?;
|
||||
|
||||
let index_response = database.main_write::<_, _, Error>(|mut write_txn| {
|
||||
created_index.main.put_name(&mut write_txn, &name)?;
|
||||
|
||||
let created_at = created_index
|
||||
.main
|
||||
.created_at(&write_txn)?
|
||||
.ok_or(Error::internal("Impossible to read created at"))?;
|
||||
|
||||
let updated_at = created_index
|
||||
.main
|
||||
.updated_at(&write_txn)?
|
||||
.ok_or(Error::internal("Impossible to read updated at"))?;
|
||||
|
||||
if let Some(id) = primary_key.clone() {
|
||||
if let Some(mut schema) = created_index.main.schema(&write_txn)? {
|
||||
schema
|
||||
.set_primary_key(&id)
|
||||
.map_err(Error::bad_request)?;
|
||||
created_index.main.put_schema(&mut write_txn, &schema)?;
|
||||
}
|
||||
}
|
||||
let index_response = IndexResponse {
|
||||
name,
|
||||
uid,
|
||||
created_at,
|
||||
updated_at,
|
||||
primary_key,
|
||||
};
|
||||
Ok(index_response)
|
||||
})?;
|
||||
|
||||
Ok(index_response)
|
||||
}
|
||||
|
||||
#[post("/indexes", wrap = "Authentication::Private")]
|
||||
async fn create_index(
|
||||
data: web::Data<Data>,
|
||||
|
@ -175,45 +232,9 @@ async fn create_index(
|
|||
},
|
||||
};
|
||||
|
||||
let created_index = data
|
||||
.db
|
||||
.create_index(&uid)
|
||||
.map_err(|e| match e {
|
||||
meilisearch_core::Error::IndexAlreadyExists => e.into(),
|
||||
_ => ResponseError::from(Error::create_index(e))
|
||||
})?;
|
||||
let name = body.name.as_ref().unwrap_or(&uid).to_string();
|
||||
|
||||
let index_response = data.db.main_write::<_, _, ResponseError>(|mut writer| {
|
||||
let name = body.name.as_ref().unwrap_or(&uid);
|
||||
created_index.main.put_name(&mut writer, name)?;
|
||||
|
||||
let created_at = created_index
|
||||
.main
|
||||
.created_at(&writer)?
|
||||
.ok_or(Error::internal("Impossible to read created at"))?;
|
||||
|
||||
let updated_at = created_index
|
||||
.main
|
||||
.updated_at(&writer)?
|
||||
.ok_or(Error::internal("Impossible to read updated at"))?;
|
||||
|
||||
if let Some(id) = body.primary_key.clone() {
|
||||
if let Some(mut schema) = created_index.main.schema(&writer)? {
|
||||
schema
|
||||
.set_primary_key(&id)
|
||||
.map_err(Error::bad_request)?;
|
||||
created_index.main.put_schema(&mut writer, &schema)?;
|
||||
}
|
||||
}
|
||||
let index_response = IndexResponse {
|
||||
name: name.to_string(),
|
||||
uid,
|
||||
created_at,
|
||||
updated_at,
|
||||
primary_key: body.primary_key.clone(),
|
||||
};
|
||||
Ok(index_response)
|
||||
})?;
|
||||
let index_response = create_index_sync(&data.db, uid, name, body.primary_key.clone())?;
|
||||
|
||||
Ok(HttpResponse::Created().json(index_response))
|
||||
}
|
||||
|
@ -340,20 +361,28 @@ async fn get_update_status(
|
|||
)).into()),
|
||||
}
|
||||
}
|
||||
pub fn get_all_updates_status_sync(
|
||||
data: &web::Data<Data>,
|
||||
reader: &UpdateReader,
|
||||
index_uid: &str,
|
||||
) -> Result<Vec<UpdateStatus>, Error> {
|
||||
let index = data
|
||||
.db
|
||||
.open_index(index_uid)
|
||||
.ok_or(Error::index_not_found(index_uid))?;
|
||||
|
||||
Ok(index.all_updates_status(reader)?)
|
||||
}
|
||||
|
||||
#[get("/indexes/{index_uid}/updates", wrap = "Authentication::Private")]
|
||||
async fn get_all_updates_status(
|
||||
data: web::Data<Data>,
|
||||
path: web::Path<IndexParam>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index = data
|
||||
.db
|
||||
.open_index(&path.index_uid)
|
||||
.ok_or(Error::index_not_found(&path.index_uid))?;
|
||||
|
||||
let reader = data.db.update_read_txn()?;
|
||||
|
||||
let response = index.all_updates_status(&reader)?;
|
||||
let response = get_all_updates_status_sync(&data, &reader, &path.index_uid)?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(response))
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use actix_web::web;
|
||||
use actix_web::HttpResponse;
|
||||
use actix_web_macros::get;
|
||||
use actix_web::get;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::helpers::Authentication;
|
||||
|
|
|
@ -10,6 +10,7 @@ pub mod setting;
|
|||
pub mod stats;
|
||||
pub mod stop_words;
|
||||
pub mod synonym;
|
||||
pub mod backup;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct IndexParam {
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
use std::collections::{HashSet, HashMap};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use actix_web::{get, post, web, HttpResponse};
|
||||
use log::warn;
|
||||
use actix_web::web;
|
||||
use actix_web::HttpResponse;
|
||||
use actix_web_macros::{get, post};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
|
@ -14,11 +12,10 @@ use crate::routes::IndexParam;
|
|||
use crate::Data;
|
||||
|
||||
use meilisearch_core::facets::FacetFilter;
|
||||
use meilisearch_schema::{Schema, FieldId};
|
||||
use meilisearch_schema::{FieldId, Schema};
|
||||
|
||||
pub fn services(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(search_with_post)
|
||||
.service(search_with_url_query);
|
||||
cfg.service(search_with_post).service(search_with_url_query);
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
@ -93,7 +90,11 @@ async fn search_with_post(
|
|||
}
|
||||
|
||||
impl SearchQuery {
|
||||
fn search(&self, index_uid: &str, data: web::Data<Data>) -> Result<SearchResult, ResponseError> {
|
||||
fn search(
|
||||
&self,
|
||||
index_uid: &str,
|
||||
data: web::Data<Data>,
|
||||
) -> Result<SearchResult, ResponseError> {
|
||||
let index = data
|
||||
.db
|
||||
.open_index(index_uid)
|
||||
|
@ -105,7 +106,12 @@ impl SearchQuery {
|
|||
.schema(&reader)?
|
||||
.ok_or(Error::internal("Impossible to retrieve the schema"))?;
|
||||
|
||||
let mut search_builder = index.new_search(self.q.clone());
|
||||
let query = self
|
||||
.q
|
||||
.clone()
|
||||
.and_then(|q| if q.is_empty() { None } else { Some(q) });
|
||||
|
||||
let mut search_builder = index.new_search(query);
|
||||
|
||||
if let Some(offset) = self.offset {
|
||||
search_builder.offset(offset);
|
||||
|
@ -118,7 +124,8 @@ impl SearchQuery {
|
|||
let mut restricted_attributes: HashSet<&str>;
|
||||
match &self.attributes_to_retrieve {
|
||||
Some(attributes_to_retrieve) => {
|
||||
let attributes_to_retrieve: HashSet<&str> = attributes_to_retrieve.split(',').collect();
|
||||
let attributes_to_retrieve: HashSet<&str> =
|
||||
attributes_to_retrieve.split(',').collect();
|
||||
if attributes_to_retrieve.contains("*") {
|
||||
restricted_attributes = available_attributes.clone();
|
||||
} else {
|
||||
|
@ -132,15 +139,22 @@ impl SearchQuery {
|
|||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
None => {
|
||||
restricted_attributes = available_attributes.clone();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref facet_filters) = self.facet_filters {
|
||||
let attrs = index.main.attributes_for_faceting(&reader)?.unwrap_or_default();
|
||||
search_builder.add_facet_filters(FacetFilter::from_str(facet_filters, &schema, &attrs)?);
|
||||
let attrs = index
|
||||
.main
|
||||
.attributes_for_faceting(&reader)?
|
||||
.unwrap_or_default();
|
||||
search_builder.add_facet_filters(FacetFilter::from_str(
|
||||
facet_filters,
|
||||
&schema,
|
||||
&attrs,
|
||||
)?);
|
||||
}
|
||||
|
||||
if let Some(facets) = &self.facets_distribution {
|
||||
|
@ -148,7 +162,7 @@ impl SearchQuery {
|
|||
Some(ref attrs) => {
|
||||
let field_ids = prepare_facet_list(&facets, &schema, attrs)?;
|
||||
search_builder.add_facets(field_ids);
|
||||
},
|
||||
}
|
||||
None => return Err(FacetCountError::NoFacetSet.into()),
|
||||
}
|
||||
}
|
||||
|
@ -160,20 +174,23 @@ impl SearchQuery {
|
|||
for attribute in attributes_to_crop.split(',') {
|
||||
let mut attribute = attribute.split(':');
|
||||
let attr = attribute.next();
|
||||
let length = attribute.next().and_then(|s| s.parse().ok()).unwrap_or(default_length);
|
||||
let length = attribute
|
||||
.next()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(default_length);
|
||||
match attr {
|
||||
Some("*") => {
|
||||
for attr in &restricted_attributes {
|
||||
final_attributes.insert(attr.to_string(), length);
|
||||
}
|
||||
},
|
||||
}
|
||||
Some(attr) => {
|
||||
if available_attributes.contains(attr) {
|
||||
final_attributes.insert(attr.to_string(), length);
|
||||
} else {
|
||||
warn!("The attributes {:?} present in attributesToCrop parameter doesn't exist", attr);
|
||||
}
|
||||
},
|
||||
}
|
||||
None => (),
|
||||
}
|
||||
}
|
||||
|
@ -215,7 +232,11 @@ impl SearchQuery {
|
|||
///
|
||||
/// An error is returned if the array is malformed, or if it contains attributes that are
|
||||
/// unexisting, or not set as facets.
|
||||
fn prepare_facet_list(facets: &str, schema: &Schema, facet_attrs: &[FieldId]) -> Result<Vec<(FieldId, String)>, FacetCountError> {
|
||||
fn prepare_facet_list(
|
||||
facets: &str,
|
||||
schema: &Schema,
|
||||
facet_attrs: &[FieldId],
|
||||
) -> Result<Vec<(FieldId, String)>, FacetCountError> {
|
||||
let json_array = serde_json::from_str(facets)?;
|
||||
match json_array {
|
||||
Value::Array(vals) => {
|
||||
|
@ -243,6 +264,6 @@ fn prepare_facet_list(facets: &str, schema: &Schema, facet_attrs: &[FieldId]) ->
|
|||
}
|
||||
Ok(field_ids)
|
||||
}
|
||||
bad_val => Err(FacetCountError::unexpected_token(bad_val, &["[String]"]))
|
||||
bad_val => Err(FacetCountError::unexpected_token(bad_val, &["[String]"])),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
use std::collections::{BTreeMap, BTreeSet, HashSet};
|
||||
|
||||
use actix_web::{delete, get, post};
|
||||
use actix_web::{web, HttpResponse};
|
||||
use actix_web_macros::{delete, get, post};
|
||||
use meilisearch_core::{MainReader, UpdateWriter};
|
||||
use meilisearch_core::settings::{Settings, SettingsUpdate, UpdateState, DEFAULT_RANKING_RULES};
|
||||
use meilisearch_schema::Schema;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use crate::Data;
|
||||
use crate::error::{Error, ResponseError};
|
||||
use crate::helpers::Authentication;
|
||||
use crate::routes::{IndexParam, IndexUpdateResponse};
|
||||
use crate::Data;
|
||||
|
||||
pub fn services(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(update_all)
|
||||
|
@ -30,73 +32,77 @@ pub fn services(cfg: &mut web::ServiceConfig) {
|
|||
.service(update_attributes_for_faceting);
|
||||
}
|
||||
|
||||
pub fn update_all_settings_txn(
|
||||
data: &web::Data<Data>,
|
||||
settings: SettingsUpdate,
|
||||
index_uid: &str,
|
||||
write_txn: &mut UpdateWriter,
|
||||
) -> Result<u64, Error> {
|
||||
let index = data
|
||||
.db
|
||||
.open_index(index_uid)
|
||||
.ok_or(Error::index_not_found(index_uid))?;
|
||||
|
||||
let update_id = index.settings_update(write_txn, settings)?;
|
||||
Ok(update_id)
|
||||
}
|
||||
|
||||
#[post("/indexes/{index_uid}/settings", wrap = "Authentication::Private")]
|
||||
async fn update_all(
|
||||
data: web::Data<Data>,
|
||||
path: web::Path<IndexParam>,
|
||||
body: web::Json<Settings>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index = data
|
||||
.db
|
||||
.open_index(&path.index_uid)
|
||||
.ok_or(Error::index_not_found(&path.index_uid))?;
|
||||
let settings = body
|
||||
.into_inner()
|
||||
.to_update()
|
||||
.map_err(Error::bad_request)?;
|
||||
|
||||
let update_id = data.db.update_write::<_, _, ResponseError>(|writer| {
|
||||
let settings = body
|
||||
.into_inner()
|
||||
.to_update()
|
||||
.map_err(Error::bad_request)?;
|
||||
let update_id = index.settings_update(writer, settings)?;
|
||||
Ok(update_id)
|
||||
let update_id = data.db.update_write::<_, _, Error>(|writer| {
|
||||
update_all_settings_txn(&data, settings, &path.index_uid, writer)
|
||||
})?;
|
||||
|
||||
Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id)))
|
||||
}
|
||||
|
||||
#[get("/indexes/{index_uid}/settings", wrap = "Authentication::Private")]
|
||||
async fn get_all(
|
||||
data: web::Data<Data>,
|
||||
path: web::Path<IndexParam>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
pub fn get_all_sync(data: &web::Data<Data>, reader: &MainReader, index_uid: &str) -> Result<Settings, Error> {
|
||||
let index = data
|
||||
.db
|
||||
.open_index(&path.index_uid)
|
||||
.ok_or(Error::index_not_found(&path.index_uid))?;
|
||||
|
||||
let reader = data.db.main_read_txn()?;
|
||||
.open_index(index_uid)
|
||||
.ok_or(Error::index_not_found(index_uid))?;
|
||||
|
||||
let stop_words: BTreeSet<String> = index
|
||||
.main
|
||||
.stop_words(&reader)?
|
||||
.stop_words(reader)?
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let synonyms_list = index.main.synonyms(&reader)?;
|
||||
let synonyms_list = index.main.synonyms(reader)?;
|
||||
|
||||
let mut synonyms = BTreeMap::new();
|
||||
let index_synonyms = &index.synonyms;
|
||||
for synonym in synonyms_list {
|
||||
let list = index_synonyms.synonyms(&reader, synonym.as_bytes())?;
|
||||
let list = index_synonyms.synonyms(reader, synonym.as_bytes())?;
|
||||
synonyms.insert(synonym, list);
|
||||
}
|
||||
|
||||
let ranking_rules = index
|
||||
.main
|
||||
.ranking_rules(&reader)?
|
||||
.ranking_rules(reader)?
|
||||
.unwrap_or(DEFAULT_RANKING_RULES.to_vec())
|
||||
.into_iter()
|
||||
.map(|r| r.to_string())
|
||||
.collect();
|
||||
|
||||
|
||||
let schema = index.main.schema(&reader)?;
|
||||
let schema = index.main.schema(reader)?;
|
||||
|
||||
let distinct_attribute = match (index.main.distinct_attribute(&reader)?, &schema) {
|
||||
let distinct_attribute = match (index.main.distinct_attribute(reader)?, &schema) {
|
||||
(Some(id), Some(schema)) => schema.name(id).map(str::to_string),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let attributes_for_faceting = match (&schema, &index.main.attributes_for_faceting(&reader)?) {
|
||||
let attributes_for_faceting = match (&schema, &index.main.attributes_for_faceting(reader)?) {
|
||||
(Some(schema), Some(attrs)) => {
|
||||
attrs
|
||||
.iter()
|
||||
|
@ -110,7 +116,7 @@ async fn get_all(
|
|||
let searchable_attributes = schema.as_ref().map(get_indexed_attributes);
|
||||
let displayed_attributes = schema.as_ref().map(get_displayed_attributes);
|
||||
|
||||
let settings = Settings {
|
||||
Ok(Settings {
|
||||
ranking_rules: Some(Some(ranking_rules)),
|
||||
distinct_attribute: Some(distinct_attribute),
|
||||
searchable_attributes: Some(searchable_attributes),
|
||||
|
@ -118,7 +124,16 @@ async fn get_all(
|
|||
stop_words: Some(Some(stop_words)),
|
||||
synonyms: Some(Some(synonyms)),
|
||||
attributes_for_faceting: Some(Some(attributes_for_faceting)),
|
||||
};
|
||||
})
|
||||
}
|
||||
|
||||
#[get("/indexes/{index_uid}/settings", wrap = "Authentication::Private")]
|
||||
async fn get_all(
|
||||
data: web::Data<Data>,
|
||||
path: web::Path<IndexParam>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let reader = data.db.main_read_txn()?;
|
||||
let settings = get_all_sync(&data, &reader, &path.index_uid)?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(settings))
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::collections::{HashMap, BTreeMap};
|
|||
|
||||
use actix_web::web;
|
||||
use actix_web::HttpResponse;
|
||||
use actix_web_macros::get;
|
||||
use actix_web::get;
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::error;
|
||||
use serde::Serialize;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use actix_web::{web, HttpResponse};
|
||||
use actix_web_macros::{delete, get, post};
|
||||
use actix_web::{delete, get, post};
|
||||
use meilisearch_core::settings::{SettingsUpdate, UpdateState};
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use actix_web::{web, HttpResponse};
|
||||
use actix_web_macros::{delete, get, post};
|
||||
use actix_web::{delete, get, post};
|
||||
use indexmap::IndexMap;
|
||||
use meilisearch_core::settings::{SettingsUpdate, UpdateState};
|
||||
|
||||
|
|
|
@ -1,42 +1,14 @@
|
|||
use crate::Data;
|
||||
use crate::error::Error;
|
||||
use crate::helpers::compression;
|
||||
|
||||
use flate2::Compression;
|
||||
use flate2::read::GzDecoder;
|
||||
use flate2::write::GzEncoder;
|
||||
use log::error;
|
||||
use std::fs::{create_dir_all, File};
|
||||
use std::io;
|
||||
use std::fs::create_dir_all;
|
||||
use std::path::Path;
|
||||
use std::thread;
|
||||
use std::time::{Duration};
|
||||
use tar::{Builder, Archive};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn pack(src: &Path, dest: &Path) -> io::Result<()> {
|
||||
let f = File::create(dest)?;
|
||||
let gz_encoder = GzEncoder::new(f, Compression::default());
|
||||
|
||||
let mut tar_encoder = Builder::new(gz_encoder);
|
||||
tar_encoder.append_dir_all(".", src)?;
|
||||
let gz_encoder = tar_encoder.into_inner()?;
|
||||
|
||||
gz_encoder.finish()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unpack(src: &Path, dest: &Path) -> Result<(), Error> {
|
||||
let f = File::open(src)?;
|
||||
let gz = GzDecoder::new(f);
|
||||
let mut ar = Archive::new(gz);
|
||||
|
||||
create_dir_all(dest)?;
|
||||
ar.unpack(dest)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn load_snapshot(
|
||||
db_path: &str,
|
||||
snapshot_path: &Path,
|
||||
|
@ -46,7 +18,7 @@ pub fn load_snapshot(
|
|||
let db_path = Path::new(db_path);
|
||||
|
||||
if !db_path.exists() && snapshot_path.exists() {
|
||||
unpack(snapshot_path, db_path)
|
||||
compression::from_tar_gz(snapshot_path, db_path)
|
||||
} else if db_path.exists() && !ignore_snapshot_if_db_exists {
|
||||
Err(Error::Internal(format!("database already exists at {:?}", db_path)))
|
||||
} else if !snapshot_path.exists() && !ignore_missing_snapshot {
|
||||
|
@ -61,7 +33,7 @@ pub fn create_snapshot(data: &Data, snapshot_path: &Path) -> Result<(), Error> {
|
|||
|
||||
data.db.copy_and_compact_to_path(tmp_dir.path())?;
|
||||
|
||||
pack(tmp_dir.path(), snapshot_path).or_else(|e| Err(Error::Internal(format!("something went wrong during snapshot compression: {}", e))))
|
||||
compression::to_tar_gz(tmp_dir.path(), snapshot_path).or_else(|e| Err(Error::Internal(format!("something went wrong during snapshot compression: {}", e))))
|
||||
}
|
||||
|
||||
pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> {
|
||||
|
@ -102,11 +74,11 @@ mod tests {
|
|||
let file_2_relative = Path::new("subfolder/file2.txt");
|
||||
|
||||
create_dir_all(src_dir.join(subfolder_relative)).unwrap();
|
||||
File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
|
||||
File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
|
||||
fs::File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
|
||||
fs::File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
|
||||
|
||||
|
||||
assert!(pack(&src_dir, &archive_path).is_ok());
|
||||
assert!(compression::to_tar_gz(&src_dir, &archive_path).is_ok());
|
||||
assert!(archive_path.exists());
|
||||
assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok());
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue