diff --git a/src/data/mod.rs b/src/data/mod.rs index de24d0a06..8c512895c 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -9,8 +9,8 @@ use std::sync::Arc; use sha2::Digest; -use crate::index_controller::{IndexController, LocalIndexController}; -use crate::{option::Opt, index_controller::Settings}; +use crate::index_controller::{IndexController, LocalIndexController, IndexMetadata, Settings, IndexSettings}; +use crate::option::Opt; #[derive(Clone)] pub struct Data { @@ -114,6 +114,27 @@ impl Data { }) } + pub fn list_indexes(&self) -> anyhow::Result> { + self.index_controller.list_indexes() + } + + pub fn index(&self, name: impl AsRef) -> anyhow::Result> { + Ok(self + .list_indexes()? + .into_iter() + .find(|i| i.name == name.as_ref())) + } + + pub fn create_index(&self, name: impl AsRef, primary_key: Option>) -> anyhow::Result { + let settings = IndexSettings { + name: Some(name.as_ref().to_string()), + primary_key: primary_key.map(|s| s.as_ref().to_string()), + }; + + let meta = self.index_controller.create_index(settings)?; + Ok(meta) + } + #[inline] pub fn http_payload_size_limit(&self) -> usize { self.options.http_payload_size_limit.get_bytes() as usize diff --git a/src/error.rs b/src/error.rs index c3533bcef..33aa06d3c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -28,6 +28,13 @@ impl fmt::Display for ResponseError { } } +// TODO: remove this when implementing actual error handling +impl From for ResponseError { + fn from(other: anyhow::Error) -> ResponseError { + ResponseError { inner: Box::new(Error::NotFound(other.to_string())) } + } +} + impl From for ResponseError { fn from(error: Error) -> ResponseError { ResponseError { inner: Box::new(error) } diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 483b6f5d6..d28228efd 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -2,8 +2,11 @@ use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use anyhow::{bail, Context}; +use chrono::{DateTime, Utc}; use dashmap::{DashMap, mapref::entry::Entry}; use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; +use log::error; use milli::Index; use rayon::ThreadPool; use serde::{Serialize, Deserialize}; @@ -16,10 +19,11 @@ use super::{UpdateMeta, UpdateResult}; type UpdateStore = super::update_store::UpdateStore; #[derive(Serialize, Deserialize, Debug, PartialEq)] -struct IndexMeta { +pub struct IndexMeta { update_store_size: u64, index_store_size: u64, - uuid: Uuid, + pub uuid: Uuid, + pub created_at: DateTime, } impl IndexMeta { @@ -50,9 +54,9 @@ impl IndexMeta { pub struct IndexStore { env: Env, - name_to_uuid_meta: Database, + name_to_uuid: Database, uuid_to_index: DashMap, Arc)>, - uuid_to_index_db: Database>, + uuid_to_index_meta: Database>, thread_pool: Arc, indexer_options: IndexerOpts, @@ -65,9 +69,9 @@ impl IndexStore { .max_dbs(2) .open(path)?; - let uid_to_index = DashMap::new(); - let name_to_uid_db = open_or_create_database(&env, Some("name_to_uid"))?; - let uid_to_index_db = open_or_create_database(&env, Some("uid_to_index_db"))?; + let uuid_to_index = DashMap::new(); + let name_to_uuid = open_or_create_database(&env, Some("name_to_uid"))?; + let uuid_to_index_meta = open_or_create_database(&env, Some("uid_to_index_db"))?; let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(indexer_options.indexing_jobs.unwrap_or(0)) @@ -76,9 +80,9 @@ impl IndexStore { Ok(Self { env, - name_to_uuid_meta: name_to_uid_db, - uuid_to_index: uid_to_index, - uuid_to_index_db: uid_to_index_db, + name_to_uuid, + uuid_to_index, + uuid_to_index_meta, thread_pool, indexer_options, @@ -86,7 +90,7 @@ impl IndexStore { } fn index_uuid(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result> { - match self.name_to_uuid_meta.get(txn, name.as_ref())? { + match self.name_to_uuid.get(txn, name.as_ref())? { Some(bytes) => { let uuid = Uuid::from_slice(bytes)?; Ok(Some(uuid)) @@ -98,7 +102,7 @@ impl IndexStore { fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result, Arc)>> { match self.uuid_to_index.entry(uid.clone()) { Entry::Vacant(entry) => { - match self.uuid_to_index_db.get(txn, uid.as_bytes())? { + match self.uuid_to_index_meta.get(txn, uid.as_bytes())? { Some(meta) => { let path = self.env.path(); let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.indexer_options)?; @@ -138,14 +142,14 @@ impl IndexStore { Some(res) => Ok(res), None => { let uuid = Uuid::new_v4(); - let result = self.create_index(&mut txn, uuid, name, update_size, index_size)?; + let (index, updates, _) = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; // If we fail to commit the transaction, we must delete the database from the // file-system. if let Err(e) = txn.commit() { self.clean_db(uuid); return Err(e)?; } - Ok(result) + Ok((index, updates)) }, } } @@ -161,17 +165,18 @@ impl IndexStore { self.uuid_to_index.remove(&uuid); } - fn create_index( &self, + fn create_index_txn( &self, txn: &mut RwTxn, uuid: Uuid, name: impl AsRef, - update_size: u64, - index_size: u64, - ) -> anyhow::Result<(Arc, Arc)> { - let meta = IndexMeta { update_store_size: update_size, index_store_size: index_size, uuid: uuid.clone() }; + update_store_size: u64, + index_store_size: u64, + ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { + let created_at = Utc::now(); + let meta = IndexMeta { update_store_size, index_store_size, uuid: uuid.clone(), created_at }; - self.name_to_uuid_meta.put(txn, name.as_ref(), uuid.as_bytes())?; - self.uuid_to_index_db.put(txn, uuid.as_bytes(), &meta)?; + self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?; + self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; let path = self.env.path(); let (index, update_store) = match meta.open(path, self.thread_pool.clone(), &self.indexer_options) { @@ -184,7 +189,56 @@ impl IndexStore { self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone())); - Ok((index, update_store)) + Ok((index, update_store, meta)) + } + + /// Same as `get_or_create`, but returns an error if the index already exists. + pub fn create_index( + &self, + name: impl AsRef, + update_size: u64, + index_size: u64, + ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { + let uuid = Uuid::new_v4(); + let mut txn = self.env.write_txn()?; + + if self.name_to_uuid.get(&txn, name.as_ref())?.is_some() { + bail!("cannot create index {:?}: an index with this name already exists.") + } + + let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; + // If we fail to commit the transaction, we must delete the database from the + // file-system. + if let Err(e) = txn.commit() { + self.clean_db(uuid); + return Err(e)?; + } + Ok(result) + } + + /// Returns each index associated with its metadata: + /// (index_name, IndexMeta, primary_key) + /// This method will force all the indexes to be loaded. + pub fn list_indexes(&self) -> anyhow::Result)>> { + let txn = self.env.read_txn()?; + let metas = self.name_to_uuid + .iter(&txn)? + .filter_map(|entry| entry + .map_err(|e| { error!("error decoding entry while listing indexes: {}", e); e }).ok()); + let mut indexes = Vec::new(); + for (name, uuid) in metas { + // get index to retrieve primary key + let (index, _) = self.get_index_txn(&txn, name)? + .with_context(|| format!("could not load index {:?}", name))?; + let primary_key = index.primary_key(&index.read_txn()?)? + .map(String::from); + // retieve meta + let meta = self.uuid_to_index_meta + .get(&txn, &uuid)? + .with_context(|| format!("could not retieve meta for index {:?}", name))?; + indexes.push((name.to_owned(), meta, primary_key)); + } + Ok(indexes) } } @@ -247,7 +301,7 @@ mod test { // insert an uuid in the the name_to_uuid_db: let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); txn.commit().unwrap(); // check that the uuid is there @@ -264,9 +318,14 @@ mod test { let txn = store.env.read_txn().unwrap(); assert!(store.retrieve_index(&txn, uuid).unwrap().is_none()); - let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() }; + let meta = IndexMeta { + update_store_size: 4096 * 100, + index_store_size: 4096 * 100, + uuid: uuid.clone(), + created_at: Utc::now(), + }; let mut txn = store.env.write_txn().unwrap(); - store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); txn.commit().unwrap(); // the index cache should be empty @@ -286,10 +345,15 @@ mod test { assert!(store.index(&name).unwrap().is_none()); let uuid = Uuid::new_v4(); - let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() }; + let meta = IndexMeta { + update_store_size: 4096 * 100, + index_store_size: 4096 * 100, + uuid: uuid.clone(), + created_at: Utc::now(), + }; let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap(); - store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); txn.commit().unwrap(); assert!(store.index(&name).unwrap().is_some()); @@ -301,14 +365,18 @@ mod test { let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); let name = "foobar"; - store.get_or_create_index(&name, 4096 * 100, 4096 * 100).unwrap(); + let update_store_size = 4096 * 100; + let index_store_size = 4096 * 100; + store.get_or_create_index(&name, update_store_size, index_store_size).unwrap(); let txn = store.env.read_txn().unwrap(); - let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap(); + let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() }; - assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); + let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + assert_eq!(meta.update_store_size, update_store_size); + assert_eq!(meta.index_store_size, index_store_size); + assert_eq!(meta.uuid, uuid); } #[test] @@ -317,17 +385,19 @@ mod test { let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); let name = "foobar"; - let update_size = 4096 * 100; - let index_size = 4096 * 100; + let update_store_size = 4096 * 100; + let index_store_size = 4096 * 100; let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.create_index(&mut txn, uuid, name, update_size, index_size).unwrap(); - let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap(); + store.create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size).unwrap(); + let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = IndexMeta { update_store_size: update_size , index_store_size: index_size, uuid: uuid.clone() }; - assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); + let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + assert_eq!(meta.update_store_size, update_store_size); + assert_eq!(meta.index_store_size, index_store_size); + assert_eq!(meta.uuid, uuid); } } } diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index b59eb2a99..60a02573f 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -5,7 +5,7 @@ mod update_handler; use std::path::Path; use std::sync::Arc; -use anyhow::bail; +use anyhow::{bail, Context}; use itertools::Itertools; use milli::Index; @@ -13,7 +13,7 @@ use crate::option::IndexerOpts; use index_store::IndexStore; use super::IndexController; use super::updates::UpdateStatus; -use super::{UpdateMeta, UpdateResult}; +use super::{UpdateMeta, UpdateResult, IndexMetadata, IndexSettings}; pub struct LocalIndexController { indexes: IndexStore, @@ -58,8 +58,25 @@ impl IndexController for LocalIndexController { Ok(pending.into()) } - fn create_index>(&self, _index_uid: S) -> anyhow::Result<()> { - todo!() + fn create_index(&self, index_settings: IndexSettings) -> anyhow::Result { + let index_name = index_settings.name.context("Missing name for index")?; + let (index, _, meta) = self.indexes.create_index(&index_name, self.update_db_size, self.index_db_size)?; + if let Some(ref primary_key) = index_settings.primary_key { + if let Err(e) = update_primary_key(index, primary_key).context("error creating index") { + // TODO: creating index could not be completed, delete everything. + Err(e)? + } + } + + let meta = IndexMetadata { + name: index_name, + uuid: meta.uuid.clone(), + created_at: meta.created_at, + updated_at: meta.created_at, + primary_key: index_settings.primary_key, + }; + + Ok(meta) } fn delete_index>(&self, _index_uid: S) -> anyhow::Result<()> { @@ -102,4 +119,54 @@ impl IndexController for LocalIndexController { } } + + fn list_indexes(&self) -> anyhow::Result> { + let metas = self.indexes.list_indexes()?; + let mut output_meta = Vec::new(); + for (name, meta, primary_key) in metas { + let created_at = meta.created_at; + let uuid = meta.uuid; + let updated_at = self + .all_update_status(&name)? + .iter() + .filter_map(|u| u.processed().map(|u| u.processed_at)) + .max() + .unwrap_or(created_at); + + let index_meta = IndexMetadata { + name, + created_at, + updated_at, + uuid, + primary_key, + }; + output_meta.push(index_meta); + } + Ok(output_meta) + } +} + +fn update_primary_key(index: impl AsRef, primary_key: impl AsRef) -> anyhow::Result<()> { + let index = index.as_ref(); + let mut txn = index.write_txn()?; + if index.primary_key(&txn)?.is_some() { + bail!("primary key already set.") + } + index.put_primary_key(&mut txn, primary_key.as_ref())?; + txn.commit()?; + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + use tempfile::tempdir; + use crate::make_index_controller_tests; + + make_index_controller_tests!({ + let options = IndexerOpts::default(); + let path = tempdir().unwrap(); + let size = 4096 * 100; + LocalIndexController::new(path, options, size, size).unwrap() + }); } diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index d348ee876..2c8dd1226 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -8,14 +8,26 @@ use std::num::NonZeroUsize; use std::sync::Arc; use anyhow::Result; +use chrono::{DateTime, Utc}; use milli::Index; use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; +use uuid::Uuid; pub use updates::{Processed, Processing, Failed}; pub type UpdateStatus = updates::UpdateStatus; +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexMetadata { + pub name: String, + uuid: Uuid, + created_at: DateTime, + updated_at: DateTime, + primary_key: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum UpdateMeta { @@ -85,6 +97,11 @@ pub enum UpdateResult { Other, } +pub struct IndexSettings { + pub name: Option, + pub primary_key: Option, +} + /// The `IndexController` is in charge of the access to the underlying indices. It splits the logic /// for read access which is provided thanks to an handle to the index, and write access which must /// be provided. This allows the implementer to define the behaviour of write accesses to the @@ -115,7 +132,7 @@ pub trait IndexController { fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result; /// Create an index with the given `index_uid`. - fn create_index>(&self, index_uid: S) -> Result<()>; + fn create_index(&self, index_settings: IndexSettings) -> Result; /// Delete index with the given `index_uid`, attempting to close it beforehand. fn delete_index>(&self, index_uid: S) -> Result<()>; @@ -140,4 +157,57 @@ pub trait IndexController { fn update_status(&self, index: impl AsRef, id: u64) -> anyhow::Result>; fn all_update_status(&self, index: impl AsRef) -> anyhow::Result>; + + fn list_indexes(&self) -> anyhow::Result>; +} + + +#[cfg(test)] +#[macro_use] +pub(crate) mod test { + use super::*; + + #[macro_export] + macro_rules! make_index_controller_tests { + ($controller_buider:block) => { + #[test] + fn test_create_and_list_indexes() { + crate::index_controller::test::create_and_list_indexes($controller_buider); + } + + #[test] + fn test_create_index_with_no_name_is_error() { + crate::index_controller::test::create_index_with_no_name_is_error($controller_buider); + } + }; + } + + pub(crate) fn create_and_list_indexes(controller: S) { + let settings1 = IndexSettings { + name: Some(String::from("test_index")), + primary_key: None, + }; + + let settings2 = IndexSettings { + name: Some(String::from("test_index2")), + primary_key: Some(String::from("foo")), + }; + + controller.create_index(settings1).unwrap(); + controller.create_index(settings2).unwrap(); + + let indexes = controller.list_indexes().unwrap(); + assert_eq!(indexes.len(), 2); + assert_eq!(indexes[0].name, "test_index"); + assert_eq!(indexes[1].name, "test_index2"); + assert_eq!(indexes[1].primary_key.clone().unwrap(), "foo"); + } + + pub(crate) fn create_index_with_no_name_is_error(controller: S) { + let settings = IndexSettings { + name: None, + primary_key: None, + }; + assert!(controller.create_index(settings).is_err()); + } } diff --git a/src/index_controller/updates.rs b/src/index_controller/updates.rs index 4eb94dc4a..900987ba6 100644 --- a/src/index_controller/updates.rs +++ b/src/index_controller/updates.rs @@ -134,6 +134,13 @@ impl UpdateStatus { UpdateStatus::Failed(u) => u.id(), } } + + pub fn processed(&self) -> Option<&Processed> { + match self { + UpdateStatus::Processed(p) => Some(p), + _ => None, + } + } } impl From> for UpdateStatus { diff --git a/src/routes/document.rs b/src/routes/document.rs index aeec0e5df..dcc669f85 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -83,15 +83,6 @@ async fn get_all_documents( todo!() } -//fn find_primary_key(document: &IndexMap) -> Option { - //for key in document.keys() { - //if key.to_lowercase().contains("id") { - //return Some(key.to_string()); - //} - //} - //None -//} - #[derive(Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] struct UpdateDocumentsQuery { @@ -150,6 +141,7 @@ async fn add_documents_default( _params: web::Query, _body: web::Json>, ) -> Result { + error!("Unknown document type"); todo!() } diff --git a/src/routes/index.rs b/src/routes/index.rs index 774039f2b..d682376e3 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -19,43 +19,60 @@ pub fn services(cfg: &mut web::ServiceConfig) { .service(get_all_updates_status); } -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexResponse { - pub name: String, - pub uid: String, - created_at: DateTime, - updated_at: DateTime, - pub primary_key: Option, -} #[get("/indexes", wrap = "Authentication::Private")] -async fn list_indexes(_data: web::Data) -> Result { - todo!() +async fn list_indexes(data: web::Data) -> Result { + match data.list_indexes() { + Ok(indexes) => { + let json = serde_json::to_string(&indexes).unwrap(); + Ok(HttpResponse::Ok().body(&json)) + } + Err(e) => { + error!("error listing indexes: {}", e); + unimplemented!() + } + } + } #[get("/indexes/{index_uid}", wrap = "Authentication::Private")] async fn get_index( - _data: web::Data, - _path: web::Path, + data: web::Data, + path: web::Path, ) -> Result { - todo!() + match data.index(&path.index_uid)? { + Some(meta) => { + let json = serde_json::to_string(&meta).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + None => { + unimplemented!() + } + } } #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] struct IndexCreateRequest { - name: Option, - uid: Option, + name: String, primary_key: Option, } #[post("/indexes", wrap = "Authentication::Private")] async fn create_index( - _data: web::Data, - _body: web::Json, + data: web::Data, + body: web::Json, ) -> Result { - todo!() + match data.create_index(&body.name, body.primary_key.clone()) { + Ok(meta) => { + let json = serde_json::to_string(&meta).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(e) => { + error!("error creating index: {}", e); + unimplemented!() + } + } } #[derive(Debug, Deserialize)]