Implement backups

* trigger backup importation via http route
* follow backup advancement with status route
* import backup via a command line
* let user choose batch size of documents to import (command lines)

closes #884
closes #840
This commit is contained in:
many 2020-07-28 14:41:49 +02:00
parent efe5984d54
commit c254320860
No known key found for this signature in database
GPG key ID: 2CEF23B75189EACA
26 changed files with 1313 additions and 167 deletions

View file

@ -0,0 +1,416 @@
use std::fs::{create_dir_all, File};
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::thread;
use actix_web::web;
use chrono::offset::Utc;
use indexmap::IndexMap;
use log::error;
use meilisearch_core::{MainWriter, MainReader, UpdateReader};
use meilisearch_core::settings::Settings;
use meilisearch_core::update::{apply_settings_update, apply_documents_addition};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
use crate::Data;
use crate::error::Error;
use crate::helpers::compression;
use crate::routes::index;
use crate::routes::index::IndexResponse;
// Mutex to share backup progress.
static BACKUP_INFO: Lazy<Mutex<Option<BackupInfo>>> = Lazy::new(Mutex::default);
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
enum BackupVersion {
V1,
}
impl BackupVersion {
const CURRENT: Self = Self::V1;
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BackupMetadata {
indexes: Vec<crate::routes::index::IndexResponse>,
db_version: String,
backup_version: BackupVersion,
}
impl BackupMetadata {
/// Create a BackupMetadata with the current backup version of meilisearch.
pub fn new(indexes: Vec<crate::routes::index::IndexResponse>, db_version: String) -> Self {
BackupMetadata {
indexes,
db_version,
backup_version: BackupVersion::CURRENT,
}
}
/// Extract BackupMetadata from `metadata.json` file present at provided `folder_path`
fn from_path(folder_path: &Path) -> Result<Self, Error> {
let path = folder_path.join("metadata.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
/// Write BackupMetadata in `metadata.json` file at provided `folder_path`
fn to_path(&self, folder_path: &Path) -> Result<(), Error> {
let path = folder_path.join("metadata.json");
let file = File::create(path)?;
serde_json::to_writer(file, &self)?;
Ok(())
}
}
/// Extract Settings from `settings.json` file present at provided `folder_path`
fn settings_from_path(folder_path: &Path) -> Result<Settings, Error> {
let path = folder_path.join("settings.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
/// Write Settings in `settings.json` file at provided `folder_path`
fn settings_to_path(settings: &Settings, folder_path: &Path) -> Result<(), Error> {
let path = folder_path.join("settings.json");
let file = File::create(path)?;
serde_json::to_writer(file, settings)?;
Ok(())
}
/// Import settings and documents of a backup with version `BackupVersion::V1` in specified index.
fn import_index_v1(
data: &Data,
backup_folder: &Path,
index_uid: &str,
document_batch_size: usize,
write_txn: &mut MainWriter,
) -> Result<(), Error> {
// open index
let index = data
.db
.open_index(index_uid)
.ok_or(Error::index_not_found(index_uid))?;
// index folder path in backup folder
let index_path = &backup_folder.join(index_uid);
// extract `settings.json` file and import content
let settings = settings_from_path(&index_path)?;
let settings = settings.to_update().or_else(|_e| Err(Error::backup_failed()))?;
apply_settings_update(write_txn, &index, settings)?;
// create iterator over documents in `documents.jsonl` to make batch importation
// create iterator over documents in `documents.jsonl` to make batch importation
let documents = {
let file = File::open(&index_path.join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file);
let deserializer = serde_json::Deserializer::from_reader(reader);
deserializer.into_iter::<IndexMap<String, serde_json::Value>>()
};
// batch import document every `document_batch_size`:
// create a Vec to bufferize documents
let mut values = Vec::with_capacity(document_batch_size);
// iterate over documents
for document in documents {
// push document in buffer
values.push(document?);
// if buffer is full, create and apply a batch, and clean buffer
if values.len() == document_batch_size {
let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size));
apply_documents_addition(write_txn, &index, batch)?;
}
}
// apply documents remaining in the buffer
if !values.is_empty() {
apply_documents_addition(write_txn, &index, values)?;
}
Ok(())
}
/// Import backup from `backup_folder` in database.
pub fn import_backup(
data: &Data,
backup_folder: &Path,
document_batch_size: usize,
) -> Result<(), Error> {
// create a temporary directory
let tmp_dir = TempDir::new()?;
let tmp_dir_path = tmp_dir.path();
// extract backup in temporary directory
compression::from_tar_gz(backup_folder, tmp_dir_path)?;
// read backup metadata
let metadata = BackupMetadata::from_path(&tmp_dir_path)?;
// choose importation function from BackupVersion of metadata
let import_index = match metadata.backup_version {
BackupVersion::V1 => import_index_v1,
};
// remove indexes which have same `uid` than indexes to import and create empty indexes
let existing_index_uids = data.db.indexes_uids();
for index in metadata.indexes.iter() {
if existing_index_uids.contains(&index.uid) {
data.db.delete_index(index.uid.clone())?;
}
index::create_index_sync(&data.db, index.uid.clone(), index.name.clone(), index.primary_key.clone())?;
}
// import each indexes content
data.db.main_write::<_, _, Error>(|mut writer| {
for index in metadata.indexes {
import_index(&data, tmp_dir_path, &index.uid, document_batch_size, &mut writer)?;
}
Ok(())
})?;
Ok(())
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum BackupStatus {
Done,
Processing,
BackupProcessFailed,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BackupInfo {
pub uid: String,
pub status: BackupStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
impl BackupInfo {
pub fn new(uid: String, status: BackupStatus) -> Self {
Self { uid, status, error: None }
}
pub fn with_error(mut self, error: String) -> Self {
self.status = BackupStatus::BackupProcessFailed;
self.error = Some(error);
self
}
pub fn backup_already_in_progress(&self) -> bool {
self.status == BackupStatus::Processing
}
pub fn get_current() -> Option<Self> {
BACKUP_INFO.lock().unwrap().clone()
}
pub fn set_current(&self) {
*BACKUP_INFO.lock().unwrap() = Some(self.clone());
}
}
/// Generate uid from creation date
fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S").to_string()
}
/// Infer backup_folder from backup_uid
pub fn compressed_backup_folder(backup_folder: &Path, backup_uid: &str) -> PathBuf {
backup_folder.join(format!("{}.tar.gz", backup_uid))
}
/// Write metadata in backup
fn backup_metadata(data: &web::Data<Data>, folder_path: &Path, indexes: Vec<IndexResponse>) -> Result<(), Error> {
let (db_major, db_minor, db_patch) = data.db.version();
let metadata = BackupMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch));
metadata.to_path(folder_path)
}
/// Export settings of provided index in backup
fn backup_index_settings(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
let settings = crate::routes::setting::get_all_sync(data, reader, index_uid)?;
settings_to_path(&settings, folder_path)
}
/// Export updates of provided index in backup
fn backup_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
let updates_path = folder_path.join("updates.jsonl");
let updates = crate::routes::index::get_all_updates_status_sync(data, reader, index_uid)?;
let file = File::create(updates_path)?;
for update in updates {
serde_json::to_writer(&file, &update)?;
writeln!(&file)?;
}
Ok(())
}
/// Export documents of provided index in backup
fn backup_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
let documents_path = folder_path.join("documents.jsonl");
let file = File::create(documents_path)?;
let backup_batch_size = data.backup_batch_size;
let mut offset = 0;
loop {
let documents = crate::routes::document::get_all_documents_sync(data, reader, index_uid, offset, backup_batch_size, None)?;
if documents.len() == 0 { break; } else { offset += backup_batch_size; }
for document in documents {
serde_json::to_writer(&file, &document)?;
writeln!(&file)?;
}
}
Ok(())
}
/// Write error with a context.
fn fail_backup_process<E: std::error::Error>(backup_info: BackupInfo, context: &str, error: E) {
let error = format!("Something went wrong during backup process: {}; {}", context, error);
error!("{}", &error);
backup_info.with_error(error).set_current();
}
/// Main function of backup.
fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: BackupInfo) {
// open read transaction on Update
let update_reader = match data.db.update_read_txn() {
Ok(r) => r,
Err(e) => {
fail_backup_process(backup_info, "creating RO transaction on updates", e);
return ;
}
};
// open read transaction on Main
let main_reader = match data.db.main_read_txn() {
Ok(r) => r,
Err(e) => {
fail_backup_process(backup_info, "creating RO transaction on main", e);
return ;
}
};
// create a temporary directory
let tmp_dir = match TempDir::new() {
Ok(tmp_dir) => tmp_dir,
Err(e) => {
fail_backup_process(backup_info, "creating temporary directory", e);
return ;
}
};
let tmp_dir_path = tmp_dir.path();
// fetch indexes
let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) {
Ok(indexes) => indexes,
Err(e) => {
fail_backup_process(backup_info, "listing indexes", e);
return ;
}
};
// create metadata
if let Err(e) = backup_metadata(&data, &tmp_dir_path, indexes.clone()) {
fail_backup_process(backup_info, "generating metadata", e);
return ;
}
// export settings, updates and documents for each indexes
for index in indexes {
let index_path = tmp_dir_path.join(&index.uid);
// create index sub-dircetory
if let Err(e) = create_dir_all(&index_path) {
fail_backup_process(backup_info, &format!("creating directory for index {}", &index.uid), e);
return ;
}
// export settings
if let Err(e) = backup_index_settings(&data, &main_reader, &index_path, &index.uid) {
fail_backup_process(backup_info, &format!("generating settings for index {}", &index.uid), e);
return ;
}
// export documents
if let Err(e) = backup_index_documents(&data, &main_reader, &index_path, &index.uid) {
fail_backup_process(backup_info, &format!("generating documents for index {}", &index.uid), e);
return ;
}
// export updates
if let Err(e) = backup_index_updates(&data, &update_reader, &index_path, &index.uid) {
fail_backup_process(backup_info, &format!("generating updates for index {}", &index.uid), e);
return ;
}
}
// compress backup in a file named `{backup_uid}.tar.gz` in `backup_folder`
if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_backup_folder(&backup_folder, &backup_info.uid)) {
fail_backup_process(backup_info, "compressing backup", e);
return ;
}
// update backup info to `done`
let resume = BackupInfo::new(
backup_info.uid,
BackupStatus::Done
);
resume.set_current();
}
pub fn init_backup_process(data: &web::Data<Data>, backup_folder: &Path) -> Result<BackupInfo, Error> {
create_dir_all(backup_folder).or(Err(Error::backup_failed()))?;
// check if a backup is already in progress
if let Some(resume) = BackupInfo::get_current() {
if resume.backup_already_in_progress() {
return Err(Error::backup_conflict())
}
}
// generate a new backup info
let info = BackupInfo::new(
generate_uid(),
BackupStatus::Processing
);
info.set_current();
let data = data.clone();
let backup_folder = backup_folder.to_path_buf();
let info_cloned = info.clone();
// run backup process in a new thread
thread::spawn(move ||
backup_process(data, backup_folder, info_cloned)
);
Ok(info)
}

View file

@ -1,5 +1,6 @@
use std::error::Error;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use meilisearch_core::{Database, DatabaseOptions};
@ -25,6 +26,8 @@ impl Deref for Data {
pub struct DataInner {
pub db: Arc<Database>,
pub db_path: String,
pub backup_folder: PathBuf,
pub backup_batch_size: usize,
pub api_keys: ApiKeys,
pub server_pid: u32,
pub http_payload_size_limit: usize,
@ -57,6 +60,8 @@ impl ApiKeys {
impl Data {
pub fn new(opt: Opt) -> Result<Data, Box<dyn Error>> {
let db_path = opt.db_path.clone();
let backup_folder = opt.backup_folder.clone();
let backup_batch_size = opt.backup_batch_size;
let server_pid = std::process::id();
let db_opt = DatabaseOptions {
@ -79,6 +84,8 @@ impl Data {
let inner_data = DataInner {
db: db.clone(),
db_path,
backup_folder,
backup_batch_size,
api_keys,
server_pid,
http_payload_size_limit,

View file

@ -41,6 +41,7 @@ pub enum Error {
CreateIndex(String),
DocumentNotFound(String),
IndexNotFound(String),
IndexAlreadyExists(String),
Internal(String),
InvalidIndexUid,
InvalidToken(String),
@ -52,6 +53,8 @@ pub enum Error {
SearchDocuments(String),
PayloadTooLarge,
UnsupportedMediaType,
BackupAlreadyInProgress,
BackupProcessFailed,
}
impl error::Error for Error {}
@ -65,6 +68,7 @@ impl ErrorCode for Error {
CreateIndex(_) => Code::CreateIndex,
DocumentNotFound(_) => Code::DocumentNotFound,
IndexNotFound(_) => Code::IndexNotFound,
IndexAlreadyExists(_) => Code::IndexAlreadyExists,
Internal(_) => Code::Internal,
InvalidIndexUid => Code::InvalidIndexUid,
InvalidToken(_) => Code::InvalidToken,
@ -76,6 +80,8 @@ impl ErrorCode for Error {
SearchDocuments(_) => Code::SearchDocuments,
PayloadTooLarge => Code::PayloadTooLarge,
UnsupportedMediaType => Code::UnsupportedMediaType,
BackupAlreadyInProgress => Code::BackupAlreadyInProgress,
BackupProcessFailed => Code::BackupProcessFailed,
}
}
}
@ -178,6 +184,14 @@ impl Error {
pub fn search_documents(err: impl fmt::Display) -> Error {
Error::SearchDocuments(err.to_string())
}
pub fn backup_conflict() -> Error {
Error::BackupAlreadyInProgress
}
pub fn backup_failed() -> Error {
Error::BackupProcessFailed
}
}
impl fmt::Display for Error {
@ -188,6 +202,7 @@ impl fmt::Display for Error {
Self::CreateIndex(err) => write!(f, "Impossible to create index; {}", err),
Self::DocumentNotFound(document_id) => write!(f, "Document with id {} not found", document_id),
Self::IndexNotFound(index_uid) => write!(f, "Index {} not found", index_uid),
Self::IndexAlreadyExists(index_uid) => write!(f, "Index {} already exists", index_uid),
Self::Internal(err) => f.write_str(err),
Self::InvalidIndexUid => f.write_str("Index must have a valid uid; Index uid can be of type integer or string only composed of alphanumeric characters, hyphens (-) and underscores (_)."),
Self::InvalidToken(err) => write!(f, "Invalid API key: {}", err),
@ -199,6 +214,8 @@ impl fmt::Display for Error {
Self::SearchDocuments(err) => write!(f, "Impossible to search documents; {}", err),
Self::PayloadTooLarge => f.write_str("Payload too large"),
Self::UnsupportedMediaType => f.write_str("Unsupported media type"),
Self::BackupAlreadyInProgress => f.write_str("Another backup is already in progress"),
Self::BackupProcessFailed => f.write_str("Backup process failed"),
}
}
}
@ -218,6 +235,12 @@ impl aweb::error::ResponseError for ResponseError {
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Error {
Error::Internal(err.to_string())
}
}
impl From<meilisearch_core::Error> for ResponseError {
fn from(err: meilisearch_core::Error) -> ResponseError {
ResponseError { inner: Box::new(err) }
@ -236,14 +259,14 @@ impl From<actix_http::Error> for Error {
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Error {
impl From<meilisearch_core::Error> for Error {
fn from(err: meilisearch_core::Error) -> Error {
Error::Internal(err.to_string())
}
}
impl From<meilisearch_core::Error> for Error {
fn from(err: meilisearch_core::Error) -> Error {
impl From<serde_json::error::Error> for Error {
fn from(err: serde_json::error::Error) -> Error {
Error::Internal(err.to_string())
}
}

View file

@ -0,0 +1,27 @@
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use std::fs::{create_dir_all, File};
use std::path::Path;
use tar::{Builder, Archive};
use crate::error::Error;
pub fn to_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> {
let f = File::create(dest)?;
let gz_encoder = GzEncoder::new(f, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", src)?;
let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?;
Ok(())
}
pub fn from_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> {
let f = File::open(src)?;
let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz);
create_dir_all(dest)?;
ar.unpack(dest)?;
Ok(())
}

View file

@ -1,6 +1,7 @@
pub mod authentication;
pub mod meilisearch;
pub mod normalize_path;
pub mod compression;
pub use authentication::Authentication;
pub use normalize_path::NormalizePath;

View file

@ -8,6 +8,7 @@ pub mod option;
pub mod routes;
pub mod analytics;
pub mod snapshot;
pub mod backup;
use actix_http::Error;
use actix_service::ServiceFactory;
@ -56,6 +57,7 @@ pub fn create_app(
.configure(routes::health::services)
.configure(routes::stats::services)
.configure(routes::key::services)
.configure(routes::backup::services)
}
pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpdateResult) {

View file

@ -6,7 +6,7 @@ use main_error::MainError;
use meilisearch_http::helpers::NormalizePath;
use meilisearch_http::{create_app, index_update_callback, Data, Opt};
use structopt::StructOpt;
use meilisearch_http::snapshot;
use meilisearch_http::{snapshot, backup};
mod analytics;
@ -69,6 +69,11 @@ async fn main() -> Result<(), MainError> {
index_update_callback(name, &data_cloned, status);
}));
if let Some(path) = &opt.import_backup {
backup::import_backup(&data, path, opt.backup_batch_size)?;
}
if let Some(path) = &opt.snapshot_path {
snapshot::schedule_snapshot(data.clone(), &path, opt.snapshot_interval_sec.unwrap_or(86400))?;
}

View file

@ -115,6 +115,18 @@ pub struct Opt {
/// Defines time interval, in seconds, between each snapshot creation.
#[structopt(long, requires = "snapshot-path", env = "MEILI_SNAPSHOT_INTERVAL_SEC")]
pub snapshot_interval_sec: Option<u64>,
/// Folder where backups are created when the backup route is called.
#[structopt(long, env = "MEILI_backup_folder", default_value = "backups/")]
pub backup_folder: PathBuf,
/// Import a backup from the specified path, must be a `.tar.gz` file.
#[structopt(long, env = "MEILI_IMPORT_BACKUP", conflicts_with = "load-from-snapshot")]
pub import_backup: Option<PathBuf>,
/// The batch size used in the importation process, the bigger it is the faster the backup is created.
#[structopt(long, env = "MEILI_BACKUP_BATCH_SIZE", default_value = "1024")]
pub backup_batch_size: usize,
}
impl Opt {

View file

@ -0,0 +1,64 @@
use std::fs::File;
use std::path::Path;
use actix_web::{get, post};
use actix_web::{HttpResponse, web};
use serde::{Deserialize, Serialize};
use crate::backup::{BackupInfo, BackupStatus, compressed_backup_folder, init_backup_process};
use crate::Data;
use crate::error::{Error, ResponseError};
use crate::helpers::Authentication;
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(trigger_backup)
.service(get_backup_status);
}
#[post("/backups", wrap = "Authentication::Private")]
async fn trigger_backup(
data: web::Data<Data>,
) -> Result<HttpResponse, ResponseError> {
let backup_folder = Path::new(&data.backup_folder);
match init_backup_process(&data, &backup_folder) {
Ok(resume) => Ok(HttpResponse::Accepted().json(resume)),
Err(e) => Err(e.into())
}
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct BackupStatusResponse {
status: String,
}
#[derive(Deserialize)]
struct BackupParam {
backup_uid: String,
}
#[get("/backups/{backup_uid}/status", wrap = "Authentication::Private")]
async fn get_backup_status(
data: web::Data<Data>,
path: web::Path<BackupParam>,
) -> Result<HttpResponse, ResponseError> {
let backup_folder = Path::new(&data.backup_folder);
let backup_uid = &path.backup_uid;
if let Some(resume) = BackupInfo::get_current() {
if &resume.uid == backup_uid {
return Ok(HttpResponse::Ok().json(resume));
}
}
if File::open(compressed_backup_folder(Path::new(backup_folder), backup_uid)).is_ok() {
let resume = BackupInfo::new(
backup_uid.into(),
BackupStatus::Done
);
Ok(HttpResponse::Ok().json(resume))
} else {
Err(Error::not_found("backup does not exist").into())
}
}

View file

@ -1,11 +1,11 @@
use std::collections::{BTreeSet, HashSet};
use actix_web::{web, HttpResponse};
use actix_web::{delete, get, post, put};
use actix_web::{web, HttpResponse};
use indexmap::IndexMap;
use meilisearch_core::update;
use serde::Deserialize;
use meilisearch_core::{update, MainReader};
use serde_json::Value;
use serde::Deserialize;
use crate::Data;
use crate::error::{Error, ResponseError};
@ -85,41 +85,61 @@ struct BrowseQuery {
attributes_to_retrieve: Option<String>,
}
pub fn get_all_documents_sync(
data: &web::Data<Data>,
reader: &MainReader,
index_uid: &str,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<&String>
) -> Result<Vec<Document>, Error> {
let index = data
.db
.open_index(index_uid)
.ok_or(Error::index_not_found(index_uid))?;
let documents_ids: Result<BTreeSet<_>, _> = index
.documents_fields_counts
.documents_ids(reader)?
.skip(offset)
.take(limit)
.collect();
let attributes: Option<HashSet<&str>> = attributes_to_retrieve
.map(|a| a.split(',').collect());
let mut documents = Vec::new();
for document_id in documents_ids? {
if let Ok(Some(document)) =
index.document::<Document>(reader, attributes.as_ref(), document_id)
{
documents.push(document);
}
}
Ok(documents)
}
#[get("/indexes/{index_uid}/documents", wrap = "Authentication::Public")]
async fn get_all_documents(
data: web::Data<Data>,
path: web::Path<IndexParam>,
params: web::Query<BrowseQuery>,
) -> Result<HttpResponse, ResponseError> {
let index = data
.db
.open_index(&path.index_uid)
.ok_or(Error::index_not_found(&path.index_uid))?;
let offset = params.offset.unwrap_or(0);
let limit = params.limit.unwrap_or(20);
let index_uid = &path.index_uid;
let reader = data.db.main_read_txn()?;
let documents_ids: Result<BTreeSet<_>, _> = index
.documents_fields_counts
.documents_ids(&reader)?
.skip(offset)
.take(limit)
.collect();
let attributes: Option<HashSet<&str>> = params
.attributes_to_retrieve
.as_ref()
.map(|a| a.split(',').collect());
let mut documents = Vec::new();
for document_id in documents_ids? {
if let Ok(Some(document)) =
index.document::<Document>(&reader, attributes.as_ref(), document_id)
{
documents.push(document);
}
}
let documents = get_all_documents_sync(
&data,
&reader,
index_uid,
offset,
limit,
params.attributes_to_retrieve.as_ref()
)?;
Ok(HttpResponse::Ok().json(documents))
}

View file

@ -1,14 +1,16 @@
use actix_web::{web, HttpResponse};
use actix_web::{delete, get, post, put};
use actix_web::{web, HttpResponse};
use chrono::{DateTime, Utc};
use log::error;
use meilisearch_core::{Database, MainReader, UpdateReader};
use meilisearch_core::update::UpdateStatus;
use rand::seq::SliceRandom;
use serde::{Deserialize, Serialize};
use crate::Data;
use crate::error::{Error, ResponseError};
use crate::helpers::Authentication;
use crate::routes::IndexParam;
use crate::Data;
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(list_indexes)
@ -29,19 +31,17 @@ fn generate_uid() -> String {
.collect()
}
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
struct IndexResponse {
name: String,
uid: String,
pub struct IndexResponse {
pub name: String,
pub uid: String,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
primary_key: Option<String>,
pub primary_key: Option<String>,
}
#[get("/indexes", wrap = "Authentication::Private")]
async fn list_indexes(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
let reader = data.db.main_read_txn()?;
pub fn list_indexes_sync(data: &web::Data<Data>, reader: &MainReader) -> Result<Vec<IndexResponse>, ResponseError> {
let mut indexes = Vec::new();
for index_uid in data.db.indexes_uids() {
@ -49,23 +49,23 @@ async fn list_indexes(data: web::Data<Data>) -> Result<HttpResponse, ResponseErr
match index {
Some(index) => {
let name = index.main.name(&reader)?.ok_or(Error::internal(
let name = index.main.name(reader)?.ok_or(Error::internal(
"Impossible to get the name of an index",
))?;
let created_at = index
.main
.created_at(&reader)?
.created_at(reader)?
.ok_or(Error::internal(
"Impossible to get the create date of an index",
))?;
let updated_at = index
.main
.updated_at(&reader)?
.updated_at(reader)?
.ok_or(Error::internal(
"Impossible to get the last update date of an index",
))?;
let primary_key = match index.main.schema(&reader) {
let primary_key = match index.main.schema(reader) {
Ok(Some(schema)) => match schema.primary_key() {
Some(primary_key) => Some(primary_key.to_owned()),
None => None,
@ -89,6 +89,14 @@ async fn list_indexes(data: web::Data<Data>) -> Result<HttpResponse, ResponseErr
}
}
Ok(indexes)
}
#[get("/indexes", wrap = "Authentication::Private")]
async fn list_indexes(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
let reader = data.db.main_read_txn()?;
let indexes = list_indexes_sync(&data, &reader)?;
Ok(HttpResponse::Ok().json(indexes))
}
@ -145,6 +153,55 @@ struct IndexCreateRequest {
primary_key: Option<String>,
}
pub fn create_index_sync(
database: &std::sync::Arc<Database>,
uid: String,
name: String,
primary_key: Option<String>,
) -> Result<IndexResponse, Error> {
let created_index = database
.create_index(&uid)
.map_err(|e| match e {
meilisearch_core::Error::IndexAlreadyExists => Error::IndexAlreadyExists(uid.clone()),
_ => Error::create_index(e)
})?;
let index_response = database.main_write::<_, _, Error>(|mut write_txn| {
created_index.main.put_name(&mut write_txn, &name)?;
let created_at = created_index
.main
.created_at(&write_txn)?
.ok_or(Error::internal("Impossible to read created at"))?;
let updated_at = created_index
.main
.updated_at(&write_txn)?
.ok_or(Error::internal("Impossible to read updated at"))?;
if let Some(id) = primary_key.clone() {
if let Some(mut schema) = created_index.main.schema(&write_txn)? {
schema
.set_primary_key(&id)
.map_err(Error::bad_request)?;
created_index.main.put_schema(&mut write_txn, &schema)?;
}
}
let index_response = IndexResponse {
name,
uid,
created_at,
updated_at,
primary_key,
};
Ok(index_response)
})?;
Ok(index_response)
}
#[post("/indexes", wrap = "Authentication::Private")]
async fn create_index(
data: web::Data<Data>,
@ -175,45 +232,9 @@ async fn create_index(
},
};
let created_index = data
.db
.create_index(&uid)
.map_err(|e| match e {
meilisearch_core::Error::IndexAlreadyExists => e.into(),
_ => ResponseError::from(Error::create_index(e))
})?;
let name = body.name.as_ref().unwrap_or(&uid).to_string();
let index_response = data.db.main_write::<_, _, ResponseError>(|mut writer| {
let name = body.name.as_ref().unwrap_or(&uid);
created_index.main.put_name(&mut writer, name)?;
let created_at = created_index
.main
.created_at(&writer)?
.ok_or(Error::internal("Impossible to read created at"))?;
let updated_at = created_index
.main
.updated_at(&writer)?
.ok_or(Error::internal("Impossible to read updated at"))?;
if let Some(id) = body.primary_key.clone() {
if let Some(mut schema) = created_index.main.schema(&writer)? {
schema
.set_primary_key(&id)
.map_err(Error::bad_request)?;
created_index.main.put_schema(&mut writer, &schema)?;
}
}
let index_response = IndexResponse {
name: name.to_string(),
uid,
created_at,
updated_at,
primary_key: body.primary_key.clone(),
};
Ok(index_response)
})?;
let index_response = create_index_sync(&data.db, uid, name, body.primary_key.clone())?;
Ok(HttpResponse::Created().json(index_response))
}
@ -340,20 +361,28 @@ async fn get_update_status(
)).into()),
}
}
pub fn get_all_updates_status_sync(
data: &web::Data<Data>,
reader: &UpdateReader,
index_uid: &str,
) -> Result<Vec<UpdateStatus>, Error> {
let index = data
.db
.open_index(index_uid)
.ok_or(Error::index_not_found(index_uid))?;
Ok(index.all_updates_status(reader)?)
}
#[get("/indexes/{index_uid}/updates", wrap = "Authentication::Private")]
async fn get_all_updates_status(
data: web::Data<Data>,
path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> {
let index = data
.db
.open_index(&path.index_uid)
.ok_or(Error::index_not_found(&path.index_uid))?;
let reader = data.db.update_read_txn()?;
let response = index.all_updates_status(&reader)?;
let response = get_all_updates_status_sync(&data, &reader, &path.index_uid)?;
Ok(HttpResponse::Ok().json(response))
}

View file

@ -10,6 +10,7 @@ pub mod setting;
pub mod stats;
pub mod stop_words;
pub mod synonym;
pub mod backup;
#[derive(Deserialize)]
pub struct IndexParam {

View file

@ -1,13 +1,15 @@
use actix_web::{web, HttpResponse};
use actix_web::{delete, get, post};
use meilisearch_core::settings::{Settings, SettingsUpdate, UpdateState, DEFAULT_RANKING_RULES};
use meilisearch_schema::Schema;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use actix_web::{delete, get, post};
use actix_web::{web, HttpResponse};
use meilisearch_core::{MainReader, UpdateWriter};
use meilisearch_core::settings::{Settings, SettingsUpdate, UpdateState, DEFAULT_RANKING_RULES};
use meilisearch_schema::Schema;
use crate::Data;
use crate::error::{Error, ResponseError};
use crate::helpers::Authentication;
use crate::routes::{IndexParam, IndexUpdateResponse};
use crate::Data;
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(update_all)
@ -30,73 +32,77 @@ pub fn services(cfg: &mut web::ServiceConfig) {
.service(update_attributes_for_faceting);
}
pub fn update_all_settings_txn(
data: &web::Data<Data>,
settings: SettingsUpdate,
index_uid: &str,
write_txn: &mut UpdateWriter,
) -> Result<u64, Error> {
let index = data
.db
.open_index(index_uid)
.ok_or(Error::index_not_found(index_uid))?;
let update_id = index.settings_update(write_txn, settings)?;
Ok(update_id)
}
#[post("/indexes/{index_uid}/settings", wrap = "Authentication::Private")]
async fn update_all(
data: web::Data<Data>,
path: web::Path<IndexParam>,
body: web::Json<Settings>,
) -> Result<HttpResponse, ResponseError> {
let index = data
.db
.open_index(&path.index_uid)
.ok_or(Error::index_not_found(&path.index_uid))?;
let settings = body
.into_inner()
.to_update()
.map_err(Error::bad_request)?;
let update_id = data.db.update_write::<_, _, ResponseError>(|writer| {
let settings = body
.into_inner()
.to_update()
.map_err(Error::bad_request)?;
let update_id = index.settings_update(writer, settings)?;
Ok(update_id)
let update_id = data.db.update_write::<_, _, Error>(|writer| {
update_all_settings_txn(&data, settings, &path.index_uid, writer)
})?;
Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id)))
}
#[get("/indexes/{index_uid}/settings", wrap = "Authentication::Private")]
async fn get_all(
data: web::Data<Data>,
path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> {
pub fn get_all_sync(data: &web::Data<Data>, reader: &MainReader, index_uid: &str) -> Result<Settings, Error> {
let index = data
.db
.open_index(&path.index_uid)
.ok_or(Error::index_not_found(&path.index_uid))?;
let reader = data.db.main_read_txn()?;
.open_index(index_uid)
.ok_or(Error::index_not_found(index_uid))?;
let stop_words: BTreeSet<String> = index
.main
.stop_words(&reader)?
.stop_words(reader)?
.into_iter()
.collect();
let synonyms_list = index.main.synonyms(&reader)?;
let synonyms_list = index.main.synonyms(reader)?;
let mut synonyms = BTreeMap::new();
let index_synonyms = &index.synonyms;
for synonym in synonyms_list {
let list = index_synonyms.synonyms(&reader, synonym.as_bytes())?;
let list = index_synonyms.synonyms(reader, synonym.as_bytes())?;
synonyms.insert(synonym, list);
}
let ranking_rules = index
.main
.ranking_rules(&reader)?
.ranking_rules(reader)?
.unwrap_or(DEFAULT_RANKING_RULES.to_vec())
.into_iter()
.map(|r| r.to_string())
.collect();
let schema = index.main.schema(&reader)?;
let schema = index.main.schema(reader)?;
let distinct_attribute = match (index.main.distinct_attribute(&reader)?, &schema) {
let distinct_attribute = match (index.main.distinct_attribute(reader)?, &schema) {
(Some(id), Some(schema)) => schema.name(id).map(str::to_string),
_ => None,
};
let attributes_for_faceting = match (&schema, &index.main.attributes_for_faceting(&reader)?) {
let attributes_for_faceting = match (&schema, &index.main.attributes_for_faceting(reader)?) {
(Some(schema), Some(attrs)) => {
attrs
.iter()
@ -110,7 +116,7 @@ async fn get_all(
let searchable_attributes = schema.as_ref().map(get_indexed_attributes);
let displayed_attributes = schema.as_ref().map(get_displayed_attributes);
let settings = Settings {
Ok(Settings {
ranking_rules: Some(Some(ranking_rules)),
distinct_attribute: Some(distinct_attribute),
searchable_attributes: Some(searchable_attributes),
@ -118,7 +124,16 @@ async fn get_all(
stop_words: Some(Some(stop_words)),
synonyms: Some(Some(synonyms)),
attributes_for_faceting: Some(Some(attributes_for_faceting)),
};
})
}
#[get("/indexes/{index_uid}/settings", wrap = "Authentication::Private")]
async fn get_all(
data: web::Data<Data>,
path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> {
let reader = data.db.main_read_txn()?;
let settings = get_all_sync(&data, &reader, &path.index_uid)?;
Ok(HttpResponse::Ok().json(settings))
}

View file

@ -1,42 +1,14 @@
use crate::Data;
use crate::error::Error;
use crate::helpers::compression;
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use log::error;
use std::fs::{create_dir_all, File};
use std::io;
use std::fs::create_dir_all;
use std::path::Path;
use std::thread;
use std::time::{Duration};
use tar::{Builder, Archive};
use tempfile::TempDir;
fn pack(src: &Path, dest: &Path) -> io::Result<()> {
let f = File::create(dest)?;
let gz_encoder = GzEncoder::new(f, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", src)?;
let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?;
Ok(())
}
fn unpack(src: &Path, dest: &Path) -> Result<(), Error> {
let f = File::open(src)?;
let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz);
create_dir_all(dest)?;
ar.unpack(dest)?;
Ok(())
}
pub fn load_snapshot(
db_path: &str,
snapshot_path: &Path,
@ -46,7 +18,7 @@ pub fn load_snapshot(
let db_path = Path::new(db_path);
if !db_path.exists() && snapshot_path.exists() {
unpack(snapshot_path, db_path)
compression::from_tar_gz(snapshot_path, db_path)
} else if db_path.exists() && !ignore_snapshot_if_db_exists {
Err(Error::Internal(format!("database already exists at {:?}", db_path)))
} else if !snapshot_path.exists() && !ignore_missing_snapshot {
@ -61,7 +33,7 @@ pub fn create_snapshot(data: &Data, snapshot_path: &Path) -> Result<(), Error> {
data.db.copy_and_compact_to_path(tmp_dir.path())?;
pack(tmp_dir.path(), snapshot_path).or_else(|e| Err(Error::Internal(format!("something went wrong during snapshot compression: {}", e))))
compression::to_tar_gz(tmp_dir.path(), snapshot_path).or_else(|e| Err(Error::Internal(format!("something went wrong during snapshot compression: {}", e))))
}
pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> {
@ -102,11 +74,11 @@ mod tests {
let file_2_relative = Path::new("subfolder/file2.txt");
create_dir_all(src_dir.join(subfolder_relative)).unwrap();
File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
fs::File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
fs::File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
assert!(pack(&src_dir, &archive_path).is_ok());
assert!(compression::to_tar_gz(&src_dir, &archive_path).is_ok());
assert!(archive_path.exists());
assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok());