mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 11:57:07 +02:00
start integrating the index-scheduler in meilisearch-lib
This commit is contained in:
parent
01847a14bb
commit
acc6d3a82b
7 changed files with 163 additions and 146 deletions
|
@ -1,39 +1,30 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::io::Cursor;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_web::error::PayloadError;
|
||||
use bytes::Bytes;
|
||||
use file_store::FileStore;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use index_scheduler::IndexScheduler;
|
||||
use index_scheduler::TaskKind;
|
||||
use index_scheduler::task::{Status, Task};
|
||||
use index_scheduler::{IndexScheduler, KindWithContent, TaskId, TaskView};
|
||||
use meilisearch_auth::SearchRules;
|
||||
use meilisearch_types::index_uid::IndexUid;
|
||||
use milli::update::{IndexDocumentsMethod, IndexerConfig};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::time::sleep;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::document_formats::{read_csv, read_json, read_ndjson};
|
||||
// use crate::dump::{self, load_dump, DumpHandler};
|
||||
use crate::options::{IndexerOpts, SchedulerConfig};
|
||||
use crate::snapshot::{load_snapshot, SnapshotService};
|
||||
// use crate::snapshot::{load_snapshot, SnapshotService};
|
||||
use error::Result;
|
||||
use index::{
|
||||
Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
|
||||
Checked, Document, Index, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
|
||||
};
|
||||
|
||||
use self::error::IndexControllerError;
|
||||
|
||||
pub mod error;
|
||||
pub mod versioning;
|
||||
|
||||
|
@ -187,7 +178,6 @@ impl IndexControllerBuilder {
|
|||
|
||||
let meta_env = Arc::new(open_meta_env(db_path.as_ref(), task_store_size)?);
|
||||
|
||||
let file_store = FileStore::new(&db_path)?;
|
||||
// Create or overwrite the version file for this DB
|
||||
versioning::create_version_file(db_path.as_ref())?;
|
||||
|
||||
|
@ -204,13 +194,15 @@ impl IndexControllerBuilder {
|
|||
max_positions_per_attributes: None,
|
||||
};
|
||||
|
||||
let scheduler = IndexScheduler::new(
|
||||
db_path.as_ref().to_path_buf(),
|
||||
let index_scheduler = IndexScheduler::new(
|
||||
db_path.as_ref().join("tasks"),
|
||||
db_path.as_ref().join("update_files"),
|
||||
db_path.as_ref().join("indexes"),
|
||||
index_size,
|
||||
indexer_config,
|
||||
file_store,
|
||||
);
|
||||
)?;
|
||||
|
||||
/*
|
||||
if self.schedule_snapshot {
|
||||
let snapshot_period = self
|
||||
.snapshot_interval
|
||||
|
@ -230,10 +222,9 @@ impl IndexControllerBuilder {
|
|||
|
||||
tokio::task::spawn_local(snapshot_service.run());
|
||||
}
|
||||
*/
|
||||
|
||||
Ok(Meilisearch {
|
||||
index_scheduler: scheduler,
|
||||
})
|
||||
Ok(Meilisearch { index_scheduler })
|
||||
}
|
||||
|
||||
/// Set the index controller builder's max update store size.
|
||||
|
@ -318,100 +309,25 @@ impl Meilisearch {
|
|||
IndexControllerBuilder::default()
|
||||
}
|
||||
|
||||
pub async fn register_task(&self, task: TaskKind) -> Result<Task> {
|
||||
Ok(self.index_scheduler.register(task).await?)
|
||||
pub async fn register_task(&self, task: KindWithContent) -> Result<TaskView> {
|
||||
let this = self.clone();
|
||||
Ok(
|
||||
tokio::task::spawn_blocking(move || this.clone().index_scheduler.register(task))
|
||||
.await??,
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
|
||||
let task = self.scheduler.read().await.get_task(id, filter).await?;
|
||||
Ok(task)
|
||||
pub async fn get_task(&self, id: TaskId) -> Result<TaskView> {
|
||||
Ok(self.index_scheduler.task(id)?)
|
||||
}
|
||||
|
||||
pub async fn get_index_task(&self, index_uid: String, task_id: TaskId) -> Result<Task> {
|
||||
let creation_task_id = self
|
||||
.index_resolver
|
||||
.get_index_creation_task_id(index_uid.clone())
|
||||
.await?;
|
||||
if task_id < creation_task_id {
|
||||
return Err(TaskError::UnexistingTask(task_id).into());
|
||||
}
|
||||
|
||||
let mut filter = TaskFilter::default();
|
||||
filter.filter_index(index_uid);
|
||||
let task = self
|
||||
.scheduler
|
||||
.read()
|
||||
.await
|
||||
.get_task(task_id, Some(filter))
|
||||
.await?;
|
||||
|
||||
Ok(task)
|
||||
pub async fn list_tasks(&self, filter: index_scheduler::Query) -> Result<Vec<TaskView>> {
|
||||
Ok(self.index_scheduler.get_tasks(filter)?)
|
||||
}
|
||||
|
||||
pub async fn list_tasks(
|
||||
&self,
|
||||
filter: Option<TaskFilter>,
|
||||
limit: Option<usize>,
|
||||
offset: Option<TaskId>,
|
||||
) -> Result<Vec<Task>> {
|
||||
let tasks = self
|
||||
.scheduler
|
||||
.read()
|
||||
.await
|
||||
.list_tasks(offset, filter, limit)
|
||||
.await?;
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
pub async fn list_index_task(
|
||||
&self,
|
||||
index_uid: String,
|
||||
limit: Option<usize>,
|
||||
offset: Option<TaskId>,
|
||||
) -> Result<Vec<Task>> {
|
||||
let task_id = self
|
||||
.index_resolver
|
||||
.get_index_creation_task_id(index_uid.clone())
|
||||
.await?;
|
||||
|
||||
let mut filter = TaskFilter::default();
|
||||
filter.filter_index(index_uid);
|
||||
|
||||
let tasks = self
|
||||
.scheduler
|
||||
.read()
|
||||
.await
|
||||
.list_tasks(
|
||||
Some(offset.unwrap_or_default() + task_id),
|
||||
Some(filter),
|
||||
limit,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
pub async fn list_indexes(&self) -> Result<Vec<IndexMetadata>> {
|
||||
let indexes = self.index_resolver.list().await?;
|
||||
let mut ret = Vec::new();
|
||||
for (uid, index) in indexes {
|
||||
let meta = index.meta()?;
|
||||
let meta = IndexMetadata {
|
||||
uuid: index.uuid(),
|
||||
uid,
|
||||
meta,
|
||||
};
|
||||
ret.push(meta);
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn settings(&self, uid: String) -> Result<Settings<Checked>> {
|
||||
let index = self.index_resolver.get_index(uid).await?;
|
||||
let settings = spawn_blocking(move || index.settings()).await??;
|
||||
Ok(settings)
|
||||
pub async fn list_indexes(&self) -> Result<Vec<Index>> {
|
||||
let this = self.clone();
|
||||
Ok(spawn_blocking(move || this.index_scheduler.indexes()).await??)
|
||||
}
|
||||
|
||||
/// Return the total number of documents contained in the index + the selected documents.
|
||||
|
@ -422,11 +338,12 @@ impl Meilisearch {
|
|||
limit: usize,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> Result<(u64, Vec<Document>)> {
|
||||
let index = self.index_resolver.get_index(uid).await?;
|
||||
let result =
|
||||
spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve))
|
||||
.await??;
|
||||
Ok(result)
|
||||
let this = self.clone();
|
||||
spawn_blocking(move || -> Result<_> {
|
||||
let index = this.index_scheduler.index(&uid)?;
|
||||
Ok(index.retrieve_documents(offset, limit, attributes_to_retrieve)?)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
pub async fn document(
|
||||
|
@ -435,35 +352,38 @@ impl Meilisearch {
|
|||
doc_id: String,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> Result<Document> {
|
||||
let index = self.index_resolver.get_index(uid).await?;
|
||||
let document =
|
||||
spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve))
|
||||
.await??;
|
||||
Ok(document)
|
||||
let this = self.clone();
|
||||
spawn_blocking(move || -> Result<_> {
|
||||
let index = this.index_scheduler.index(&uid)?;
|
||||
Ok(index.retrieve_document(doc_id, attributes_to_retrieve)?)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
pub async fn search(&self, uid: String, query: SearchQuery) -> Result<SearchResult> {
|
||||
let index = self.index_resolver.get_index(uid).await?;
|
||||
let result = spawn_blocking(move || index.perform_search(query)).await??;
|
||||
Ok(result)
|
||||
let this = self.clone();
|
||||
spawn_blocking(move || -> Result<_> {
|
||||
let index = this.index_scheduler.index(&uid)?;
|
||||
Ok(index.perform_search(query)?)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> {
|
||||
let index = self.index_resolver.get_index(uid.clone()).await?;
|
||||
let uuid = index.uuid();
|
||||
let meta = spawn_blocking(move || index.meta()).await??;
|
||||
let meta = IndexMetadata { uuid, uid, meta };
|
||||
Ok(meta)
|
||||
pub async fn get_index(&self, uid: String) -> Result<Index> {
|
||||
let this = self.clone();
|
||||
Ok(spawn_blocking(move || this.index_scheduler.index(&uid)).await??)
|
||||
}
|
||||
|
||||
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
|
||||
let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?;
|
||||
let processing_tasks = self
|
||||
.index_scheduler
|
||||
.get_tasks(index_scheduler::Query::default().with_status(Status::Processing))?;
|
||||
// Check if the currently indexing update is from our index.
|
||||
let is_indexing = processing_tasks
|
||||
.first()
|
||||
.map_or(false, |task| task.index_uid().map_or(false, |u| u == uid));
|
||||
let is_indexing = processing_tasks.first().map_or(false, |task| {
|
||||
task.index_uid.as_ref().map_or(false, |u| u == &uid)
|
||||
});
|
||||
|
||||
let index = self.index_resolver.get_index(uid).await?;
|
||||
let index = self.get_index(uid).await?;
|
||||
let mut stats = spawn_blocking(move || index.stats()).await??;
|
||||
stats.is_indexing = Some(is_indexing);
|
||||
|
||||
|
@ -474,12 +394,15 @@ impl Meilisearch {
|
|||
let mut last_task: Option<OffsetDateTime> = None;
|
||||
let mut indexes = BTreeMap::new();
|
||||
let mut database_size = 0;
|
||||
let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?;
|
||||
let processing_tasks = self
|
||||
.index_scheduler
|
||||
.get_tasks(index_scheduler::Query::default().with_status(Status::Processing))?;
|
||||
|
||||
for (index_uid, index) in self.index_resolver.list().await? {
|
||||
if !search_rules.is_index_authorized(&index_uid) {
|
||||
for index in self.list_indexes().await? {
|
||||
if !search_rules.is_index_authorized(&index.name) {
|
||||
continue;
|
||||
}
|
||||
let index_name = index.name.clone();
|
||||
|
||||
let (mut stats, meta) =
|
||||
spawn_blocking::<_, Result<(IndexStats, IndexMeta)>>(move || {
|
||||
|
@ -496,10 +419,10 @@ impl Meilisearch {
|
|||
// Check if the currently indexing update is from our index.
|
||||
stats.is_indexing = processing_tasks
|
||||
.first()
|
||||
.and_then(|p| p.index_uid().map(|u| u == index_uid))
|
||||
.and_then(|p| p.index_uid.as_ref().map(|u| u == &index_name))
|
||||
.or(Some(false));
|
||||
|
||||
indexes.insert(index_uid, stats);
|
||||
indexes.insert(index_name, stats);
|
||||
}
|
||||
|
||||
Ok(Stats {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue