From d43dc4824cf9aab5e122d1915944985d5ab8d871 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 3 Feb 2021 17:44:20 +0100 Subject: [PATCH 1/8] implement list indexes --- src/data/mod.rs | 8 ++- .../local_index_controller/index_store.rs | 67 +++++++++++++++---- .../local_index_controller/mod.rs | 27 +++++++- src/index_controller/mod.rs | 14 ++++ src/index_controller/updates.rs | 7 ++ src/routes/index.rs | 23 ++++--- 6 files changed, 120 insertions(+), 26 deletions(-) diff --git a/src/data/mod.rs b/src/data/mod.rs index de24d0a06..cada4f559 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -9,8 +9,8 @@ use std::sync::Arc; use sha2::Digest; -use crate::index_controller::{IndexController, LocalIndexController}; -use crate::{option::Opt, index_controller::Settings}; +use crate::index_controller::{IndexController, LocalIndexController, IndexMetadata, Settings}; +use crate::option::Opt; #[derive(Clone)] pub struct Data { @@ -114,6 +114,10 @@ impl Data { }) } + pub fn list_indexes(&self) -> anyhow::Result> { + self.index_controller.list_indexes() + } + #[inline] pub fn http_payload_size_limit(&self) -> usize { self.options.http_payload_size_limit.get_bytes() as usize diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 483b6f5d6..138fe57fc 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -2,8 +2,10 @@ use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use chrono::{DateTime, Utc}; use dashmap::{DashMap, mapref::entry::Entry}; use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; +use log::error; use milli::Index; use rayon::ThreadPool; use serde::{Serialize, Deserialize}; @@ -16,10 +18,11 @@ use super::{UpdateMeta, UpdateResult}; type UpdateStore = super::update_store::UpdateStore; #[derive(Serialize, Deserialize, Debug, PartialEq)] -struct IndexMeta { - update_store_size: u64, - index_store_size: u64, - uuid: Uuid, +pub struct IndexMeta { + update_size: u64, + index_size: u64, + pub uuid: Uuid, + pub created_at: DateTime, } impl IndexMeta { @@ -168,7 +171,8 @@ impl IndexStore { update_size: u64, index_size: u64, ) -> anyhow::Result<(Arc, Arc)> { - let meta = IndexMeta { update_store_size: update_size, index_store_size: index_size, uuid: uuid.clone() }; + let created_at = Utc::now(); + let meta = IndexMeta { update_size, index_size, uuid: uuid.clone(), created_at }; self.name_to_uuid_meta.put(txn, name.as_ref(), uuid.as_bytes())?; self.uuid_to_index_db.put(txn, uuid.as_bytes(), &meta)?; @@ -186,6 +190,29 @@ impl IndexStore { Ok((index, update_store)) } + + /// Returns each index associated with it's metadata; + pub fn list_indexes(&self) -> anyhow::Result> { + let txn = self.env.read_txn()?; + let indexes = self.name_to_uuid_db + .iter(&txn)? + .filter_map(|entry| entry + .map_err(|e| { + error!("error decoding entry while listing indexes: {}", e); + e + }) + .ok()) + .map(|(name, uuid)| { + let meta = self.uuid_to_index_db + .get(&txn, &uuid) + .ok() + .flatten() + .unwrap_or_else(|| panic!("corrupted database, index {} should exist.", name)); + (name.to_owned(), meta) + }) + .collect(); + Ok(indexes) + } } fn open_or_create_database(env: &Env, name: Option<&str>) -> anyhow::Result> { @@ -264,7 +291,12 @@ mod test { let txn = store.env.read_txn().unwrap(); assert!(store.retrieve_index(&txn, uuid).unwrap().is_none()); - let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() }; + let meta = IndexMeta { + update_size: 4096 * 100, + index_size: 4096 * 100, + uuid: uuid.clone(), + created_at: Utc::now(), + }; let mut txn = store.env.write_txn().unwrap(); store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); txn.commit().unwrap(); @@ -286,7 +318,12 @@ mod test { assert!(store.index(&name).unwrap().is_none()); let uuid = Uuid::new_v4(); - let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() }; + let meta = IndexMeta { + update_size: 4096 * 100, + index_size: 4096 * 100, + uuid: uuid.clone(), + created_at: Utc::now(), + }; let mut txn = store.env.write_txn().unwrap(); store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap(); store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); @@ -301,14 +338,18 @@ mod test { let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); let name = "foobar"; - store.get_or_create_index(&name, 4096 * 100, 4096 * 100).unwrap(); + let update_size = 4096 * 100; + let index_size = 4096 * 100; + store.get_or_create_index(&name, update_size, index_size).unwrap(); let txn = store.env.read_txn().unwrap(); let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() }; - assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); + let meta = store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + assert_eq!(meta.update_size, update_size); + assert_eq!(meta.index_size, index_size); + assert_eq!(meta.uuid, uuid); } #[test] @@ -326,8 +367,10 @@ mod test { assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = IndexMeta { update_store_size: update_size , index_store_size: index_size, uuid: uuid.clone() }; - assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); + let meta = store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + assert_eq!(meta.update_size, update_size); + assert_eq!(meta.index_size, index_size); + assert_eq!(meta.uuid, uuid); } } } diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index b59eb2a99..6d6700639 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -13,7 +13,7 @@ use crate::option::IndexerOpts; use index_store::IndexStore; use super::IndexController; use super::updates::UpdateStatus; -use super::{UpdateMeta, UpdateResult}; +use super::{UpdateMeta, UpdateResult, IndexMetadata}; pub struct LocalIndexController { indexes: IndexStore, @@ -102,4 +102,29 @@ impl IndexController for LocalIndexController { } } + + fn list_indexes(&self) -> anyhow::Result> { + let metas = self.indexes.list_indexes()?; + let mut output_meta = Vec::new(); + for (name, meta) in metas { + let created_at = meta.created_at; + let uuid = meta.uuid; + let updated_at = self + .all_update_status(&name)? + .iter() + .filter_map(|u| u.processed().map(|u| u.processed_at)) + .max() + .unwrap_or(created_at); + + let index_meta = IndexMetadata { + name, + created_at, + updated_at, + uuid, + primary_key: None, + }; + output_meta.push(index_meta); + } + Ok(output_meta) + } } diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index d348ee876..38ff149bf 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -8,14 +8,26 @@ use std::num::NonZeroUsize; use std::sync::Arc; use anyhow::Result; +use chrono::{DateTime, Utc}; use milli::Index; use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; +use uuid::Uuid; pub use updates::{Processed, Processing, Failed}; pub type UpdateStatus = updates::UpdateStatus; +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexMetadata { + pub name: String, + uuid: Uuid, + created_at: DateTime, + updated_at: DateTime, + pub primary_key: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum UpdateMeta { @@ -140,4 +152,6 @@ pub trait IndexController { fn update_status(&self, index: impl AsRef, id: u64) -> anyhow::Result>; fn all_update_status(&self, index: impl AsRef) -> anyhow::Result>; + + fn list_indexes(&self) -> anyhow::Result>; } diff --git a/src/index_controller/updates.rs b/src/index_controller/updates.rs index 4eb94dc4a..900987ba6 100644 --- a/src/index_controller/updates.rs +++ b/src/index_controller/updates.rs @@ -134,6 +134,13 @@ impl UpdateStatus { UpdateStatus::Failed(u) => u.id(), } } + + pub fn processed(&self) -> Option<&Processed> { + match self { + UpdateStatus::Processed(p) => Some(p), + _ => None, + } + } } impl From> for UpdateStatus { diff --git a/src/routes/index.rs b/src/routes/index.rs index 774039f2b..653d51475 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -19,19 +19,20 @@ pub fn services(cfg: &mut web::ServiceConfig) { .service(get_all_updates_status); } -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexResponse { - pub name: String, - pub uid: String, - created_at: DateTime, - updated_at: DateTime, - pub primary_key: Option, -} #[get("/indexes", wrap = "Authentication::Private")] -async fn list_indexes(_data: web::Data) -> Result { - todo!() +async fn list_indexes(data: web::Data) -> Result { + match data.list_indexes() { + Ok(indexes) => { + let json = serde_json::to_string(&indexes).unwrap(); + Ok(HttpResponse::Ok().body(&json)) + } + Err(e) => { + error!("error listing indexes: {}", e); + unimplemented!() + } + } + } #[get("/indexes/{index_uid}", wrap = "Authentication::Private")] From f98830669186c0aaba33eb499486318ddc4511e1 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 3 Feb 2021 20:12:48 +0100 Subject: [PATCH 2/8] implement create index --- .../local_index_controller/index_store.rs | 33 ++++++++++++++++--- .../local_index_controller/mod.rs | 5 +-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 138fe57fc..7303a7849 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -2,6 +2,7 @@ use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use anyhow::bail; use chrono::{DateTime, Utc}; use dashmap::{DashMap, mapref::entry::Entry}; use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; @@ -141,7 +142,7 @@ impl IndexStore { Some(res) => Ok(res), None => { let uuid = Uuid::new_v4(); - let result = self.create_index(&mut txn, uuid, name, update_size, index_size)?; + let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; // If we fail to commit the transaction, we must delete the database from the // file-system. if let Err(e) = txn.commit() { @@ -164,7 +165,7 @@ impl IndexStore { self.uuid_to_index.remove(&uuid); } - fn create_index( &self, + fn create_index_txn( &self, txn: &mut RwTxn, uuid: Uuid, name: impl AsRef, @@ -191,6 +192,30 @@ impl IndexStore { Ok((index, update_store)) } + /// Same a get or create, but returns an error if the index already exists. + pub fn create_index( + &self, + name: impl AsRef, + update_size: u64, + index_size: u64, + ) -> anyhow::Result<(Arc, Arc)> { + let uuid = Uuid::new_v4(); + let mut txn = self.env.write_txn()?; + + if self.name_to_uuid_db.get(&txn, name.as_ref())?.is_some() { + bail!("cannot create index {:?}: an index with this name already exists.") + } + + let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; + // If we fail to commit the transaction, we must delete the database from the + // file-system. + if let Err(e) = txn.commit() { + self.clean_db(uuid); + return Err(e)?; + } + Ok(result) + } + /// Returns each index associated with it's metadata; pub fn list_indexes(&self) -> anyhow::Result> { let txn = self.env.read_txn()?; @@ -362,8 +387,8 @@ mod test { let index_size = 4096 * 100; let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.create_index(&mut txn, uuid, name, update_size, index_size).unwrap(); - let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap(); + store.create_index_txn(&mut txn, uuid, name, update_size, index_size).unwrap(); + let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index 6d6700639..ec70c4443 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -58,8 +58,9 @@ impl IndexController for LocalIndexController { Ok(pending.into()) } - fn create_index>(&self, _index_uid: S) -> anyhow::Result<()> { - todo!() + fn create_index>(&self, index_uid: S) -> anyhow::Result<()> { + self.indexes.create_index(index_uid, self.update_db_size, self.index_db_size)?; + Ok(()) } fn delete_index>(&self, _index_uid: S) -> anyhow::Result<()> { From 8d462afb79b3048b722367365ea7f0cbeb81d831 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 3 Feb 2021 20:20:51 +0100 Subject: [PATCH 3/8] add tests for list index and create index. --- .../local_index_controller/mod.rs | 14 ++++++++++ src/index_controller/mod.rs | 27 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index ec70c4443..21139e636 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -129,3 +129,17 @@ impl IndexController for LocalIndexController { Ok(output_meta) } } + +#[cfg(test)] +mod test { + use super::*; + use tempfile::tempdir; + use crate::make_index_controller_tests; + + make_index_controller_tests!({ + let options = IndexerOpts::default(); + let path = tempdir().unwrap(); + let size = 4096 * 100; + LocalIndexController::new(path, options, size, size).unwrap() + }); +} diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 38ff149bf..720c52cdc 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -155,3 +155,30 @@ pub trait IndexController { fn list_indexes(&self) -> anyhow::Result>; } + + +#[cfg(test)] +#[macro_use] +pub(crate) mod test { + use super::*; + + #[macro_export] + macro_rules! make_index_controller_tests { + ($controller_buider:block) => { + #[test] + fn test_create_and_list_indexes() { + crate::index_controller::test::create_and_list_indexes($controller_buider); + } + }; + } + + pub(crate) fn create_and_list_indexes(controller: S) { + controller.create_index("test_index").unwrap(); + controller.create_index("test_index2").unwrap(); + + let indexes = controller.list_indexes().unwrap(); + assert_eq!(indexes.len(), 2); + assert_eq!(indexes[0].name, "test_index"); + assert_eq!(indexes[1].name, "test_index2"); + } +} From f1c09a54be15648ade94fc9354199b546c91f514 Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 4 Feb 2021 12:34:12 +0100 Subject: [PATCH 4/8] implement get index meta --- src/data/mod.rs | 7 +++++++ src/error.rs | 7 +++++++ src/routes/document.rs | 10 +--------- src/routes/index.rs | 14 +++++++++++--- 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/data/mod.rs b/src/data/mod.rs index cada4f559..61945ee2a 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -118,6 +118,13 @@ impl Data { self.index_controller.list_indexes() } + pub fn index(&self, name: impl AsRef) -> anyhow::Result> { + Ok(self + .list_indexes()? + .into_iter() + .find(|i| i.name == name.as_ref())) + } + #[inline] pub fn http_payload_size_limit(&self) -> usize { self.options.http_payload_size_limit.get_bytes() as usize diff --git a/src/error.rs b/src/error.rs index c3533bcef..33aa06d3c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -28,6 +28,13 @@ impl fmt::Display for ResponseError { } } +// TODO: remove this when implementing actual error handling +impl From for ResponseError { + fn from(other: anyhow::Error) -> ResponseError { + ResponseError { inner: Box::new(Error::NotFound(other.to_string())) } + } +} + impl From for ResponseError { fn from(error: Error) -> ResponseError { ResponseError { inner: Box::new(error) } diff --git a/src/routes/document.rs b/src/routes/document.rs index aeec0e5df..dcc669f85 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -83,15 +83,6 @@ async fn get_all_documents( todo!() } -//fn find_primary_key(document: &IndexMap) -> Option { - //for key in document.keys() { - //if key.to_lowercase().contains("id") { - //return Some(key.to_string()); - //} - //} - //None -//} - #[derive(Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] struct UpdateDocumentsQuery { @@ -150,6 +141,7 @@ async fn add_documents_default( _params: web::Query, _body: web::Json>, ) -> Result { + error!("Unknown document type"); todo!() } diff --git a/src/routes/index.rs b/src/routes/index.rs index 653d51475..5e2574267 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -37,10 +37,18 @@ async fn list_indexes(data: web::Data) -> Result, - _path: web::Path, + data: web::Data, + path: web::Path, ) -> Result { - todo!() + match data.index(&path.index_uid)? { + Some(meta) => { + let json = serde_json::to_string(&meta).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + None => { + unimplemented!() + } + } } #[derive(Debug, Deserialize)] From f18e795124b5cb5af7486b366eb27d88ca6c7313 Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 4 Feb 2021 15:09:43 +0100 Subject: [PATCH 5/8] fix rebase --- .../local_index_controller/index_store.rs | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 7303a7849..eca18444f 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -20,8 +20,8 @@ type UpdateStore = super::update_store::UpdateStore, } @@ -54,9 +54,9 @@ impl IndexMeta { pub struct IndexStore { env: Env, - name_to_uuid_meta: Database, + name_to_uuid: Database, uuid_to_index: DashMap, Arc)>, - uuid_to_index_db: Database>, + uuid_to_index_meta: Database>, thread_pool: Arc, indexer_options: IndexerOpts, @@ -69,9 +69,9 @@ impl IndexStore { .max_dbs(2) .open(path)?; - let uid_to_index = DashMap::new(); - let name_to_uid_db = open_or_create_database(&env, Some("name_to_uid"))?; - let uid_to_index_db = open_or_create_database(&env, Some("uid_to_index_db"))?; + let uuid_to_index = DashMap::new(); + let name_to_uuid = open_or_create_database(&env, Some("name_to_uid"))?; + let uuid_to_index_meta = open_or_create_database(&env, Some("uid_to_index_db"))?; let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(indexer_options.indexing_jobs.unwrap_or(0)) @@ -80,9 +80,9 @@ impl IndexStore { Ok(Self { env, - name_to_uuid_meta: name_to_uid_db, - uuid_to_index: uid_to_index, - uuid_to_index_db: uid_to_index_db, + name_to_uuid, + uuid_to_index, + uuid_to_index_meta, thread_pool, indexer_options, @@ -90,7 +90,7 @@ impl IndexStore { } fn index_uuid(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result> { - match self.name_to_uuid_meta.get(txn, name.as_ref())? { + match self.name_to_uuid.get(txn, name.as_ref())? { Some(bytes) => { let uuid = Uuid::from_slice(bytes)?; Ok(Some(uuid)) @@ -102,7 +102,7 @@ impl IndexStore { fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result, Arc)>> { match self.uuid_to_index.entry(uid.clone()) { Entry::Vacant(entry) => { - match self.uuid_to_index_db.get(txn, uid.as_bytes())? { + match self.uuid_to_index_meta.get(txn, uid.as_bytes())? { Some(meta) => { let path = self.env.path(); let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.indexer_options)?; @@ -169,14 +169,14 @@ impl IndexStore { txn: &mut RwTxn, uuid: Uuid, name: impl AsRef, - update_size: u64, - index_size: u64, + update_store_size: u64, + index_store_size: u64, ) -> anyhow::Result<(Arc, Arc)> { let created_at = Utc::now(); - let meta = IndexMeta { update_size, index_size, uuid: uuid.clone(), created_at }; + let meta = IndexMeta { update_store_size, index_store_size, uuid: uuid.clone(), created_at }; - self.name_to_uuid_meta.put(txn, name.as_ref(), uuid.as_bytes())?; - self.uuid_to_index_db.put(txn, uuid.as_bytes(), &meta)?; + self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?; + self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; let path = self.env.path(); let (index, update_store) = match meta.open(path, self.thread_pool.clone(), &self.indexer_options) { @@ -202,7 +202,7 @@ impl IndexStore { let uuid = Uuid::new_v4(); let mut txn = self.env.write_txn()?; - if self.name_to_uuid_db.get(&txn, name.as_ref())?.is_some() { + if self.name_to_uuid.get(&txn, name.as_ref())?.is_some() { bail!("cannot create index {:?}: an index with this name already exists.") } @@ -219,7 +219,7 @@ impl IndexStore { /// Returns each index associated with it's metadata; pub fn list_indexes(&self) -> anyhow::Result> { let txn = self.env.read_txn()?; - let indexes = self.name_to_uuid_db + let indexes = self.name_to_uuid .iter(&txn)? .filter_map(|entry| entry .map_err(|e| { @@ -228,7 +228,7 @@ impl IndexStore { }) .ok()) .map(|(name, uuid)| { - let meta = self.uuid_to_index_db + let meta = self.uuid_to_index_meta .get(&txn, &uuid) .ok() .flatten() @@ -299,7 +299,7 @@ mod test { // insert an uuid in the the name_to_uuid_db: let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); txn.commit().unwrap(); // check that the uuid is there @@ -317,13 +317,13 @@ mod test { assert!(store.retrieve_index(&txn, uuid).unwrap().is_none()); let meta = IndexMeta { - update_size: 4096 * 100, - index_size: 4096 * 100, + update_store_size: 4096 * 100, + index_store_size: 4096 * 100, uuid: uuid.clone(), created_at: Utc::now(), }; let mut txn = store.env.write_txn().unwrap(); - store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); txn.commit().unwrap(); // the index cache should be empty @@ -344,14 +344,14 @@ mod test { let uuid = Uuid::new_v4(); let meta = IndexMeta { - update_size: 4096 * 100, - index_size: 4096 * 100, + update_store_size: 4096 * 100, + index_store_size: 4096 * 100, uuid: uuid.clone(), created_at: Utc::now(), }; let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap(); - store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); txn.commit().unwrap(); assert!(store.index(&name).unwrap().is_some()); @@ -363,17 +363,17 @@ mod test { let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); let name = "foobar"; - let update_size = 4096 * 100; - let index_size = 4096 * 100; - store.get_or_create_index(&name, update_size, index_size).unwrap(); + let update_store_size = 4096 * 100; + let index_store_size = 4096 * 100; + store.get_or_create_index(&name, update_store_size, index_store_size).unwrap(); let txn = store.env.read_txn().unwrap(); - let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap(); + let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap().unwrap(); - assert_eq!(meta.update_size, update_size); - assert_eq!(meta.index_size, index_size); + let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + assert_eq!(meta.update_store_size, update_store_size); + assert_eq!(meta.index_store_size, index_store_size); assert_eq!(meta.uuid, uuid); } @@ -383,18 +383,18 @@ mod test { let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); let name = "foobar"; - let update_size = 4096 * 100; - let index_size = 4096 * 100; + let update_store_size = 4096 * 100; + let index_store_size = 4096 * 100; let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.create_index_txn(&mut txn, uuid, name, update_size, index_size).unwrap(); + store.create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size).unwrap(); let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap().unwrap(); - assert_eq!(meta.update_size, update_size); - assert_eq!(meta.index_size, index_size); + let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + assert_eq!(meta.update_store_size, update_store_size); + assert_eq!(meta.index_store_size, index_store_size); assert_eq!(meta.uuid, uuid); } } From ed44e684ccc70062ff8514db98710f1f43a540b8 Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 4 Feb 2021 15:28:52 +0100 Subject: [PATCH 6/8] review fixes --- .../local_index_controller/index_store.rs | 26 +++++++------------ src/index_controller/mod.rs | 2 +- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index eca18444f..937a0cac0 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -192,7 +192,7 @@ impl IndexStore { Ok((index, update_store)) } - /// Same a get or create, but returns an error if the index already exists. + /// Same as `get_or_create`, but returns an error if the index already exists. pub fn create_index( &self, name: impl AsRef, @@ -219,23 +219,17 @@ impl IndexStore { /// Returns each index associated with it's metadata; pub fn list_indexes(&self) -> anyhow::Result> { let txn = self.env.read_txn()?; - let indexes = self.name_to_uuid + let metas = self.name_to_uuid .iter(&txn)? .filter_map(|entry| entry - .map_err(|e| { - error!("error decoding entry while listing indexes: {}", e); - e - }) - .ok()) - .map(|(name, uuid)| { - let meta = self.uuid_to_index_meta - .get(&txn, &uuid) - .ok() - .flatten() - .unwrap_or_else(|| panic!("corrupted database, index {} should exist.", name)); - (name.to_owned(), meta) - }) - .collect(); + .map_err(|e| { error!("error decoding entry while listing indexes: {}", e); e }).ok()); + let mut indexes = Vec::new(); + for (name, uuid) in metas { + let meta = self.uuid_to_index_meta + .get(&txn, &uuid)? + .unwrap_or_else(|| panic!("corrupted database, index {} should exist.", name)); + indexes.push((name.to_owned(), meta)); + } Ok(indexes) } } diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 720c52cdc..13ecc1486 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -25,7 +25,7 @@ pub struct IndexMetadata { uuid: Uuid, created_at: DateTime, updated_at: DateTime, - pub primary_key: Option, + primary_key: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] From ec047eefd2abec39c4d4c3c17c8da4c87c1b917f Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 8 Feb 2021 10:47:34 +0100 Subject: [PATCH 7/8] implement create index --- src/data/mod.rs | 5 +++ .../local_index_controller/index_store.rs | 28 +++++++++----- .../local_index_controller/mod.rs | 38 ++++++++++++++++--- src/index_controller/mod.rs | 2 +- src/routes/index.rs | 18 ++++++--- 5 files changed, 69 insertions(+), 22 deletions(-) diff --git a/src/data/mod.rs b/src/data/mod.rs index 61945ee2a..65364df00 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -125,6 +125,11 @@ impl Data { .find(|i| i.name == name.as_ref())) } + pub fn create_index(&self, name: impl AsRef, primary_key: Option>) -> anyhow::Result { + let meta = self.index_controller.create_index(name, primary_key)?; + Ok(meta) + } + #[inline] pub fn http_payload_size_limit(&self) -> usize { self.options.http_payload_size_limit.get_bytes() as usize diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 937a0cac0..d28228efd 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -2,7 +2,7 @@ use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use anyhow::bail; +use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use dashmap::{DashMap, mapref::entry::Entry}; use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; @@ -142,14 +142,14 @@ impl IndexStore { Some(res) => Ok(res), None => { let uuid = Uuid::new_v4(); - let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; + let (index, updates, _) = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; // If we fail to commit the transaction, we must delete the database from the // file-system. if let Err(e) = txn.commit() { self.clean_db(uuid); return Err(e)?; } - Ok(result) + Ok((index, updates)) }, } } @@ -171,7 +171,7 @@ impl IndexStore { name: impl AsRef, update_store_size: u64, index_store_size: u64, - ) -> anyhow::Result<(Arc, Arc)> { + ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { let created_at = Utc::now(); let meta = IndexMeta { update_store_size, index_store_size, uuid: uuid.clone(), created_at }; @@ -189,7 +189,7 @@ impl IndexStore { self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone())); - Ok((index, update_store)) + Ok((index, update_store, meta)) } /// Same as `get_or_create`, but returns an error if the index already exists. @@ -198,7 +198,7 @@ impl IndexStore { name: impl AsRef, update_size: u64, index_size: u64, - ) -> anyhow::Result<(Arc, Arc)> { + ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { let uuid = Uuid::new_v4(); let mut txn = self.env.write_txn()?; @@ -216,8 +216,10 @@ impl IndexStore { Ok(result) } - /// Returns each index associated with it's metadata; - pub fn list_indexes(&self) -> anyhow::Result> { + /// Returns each index associated with its metadata: + /// (index_name, IndexMeta, primary_key) + /// This method will force all the indexes to be loaded. + pub fn list_indexes(&self) -> anyhow::Result)>> { let txn = self.env.read_txn()?; let metas = self.name_to_uuid .iter(&txn)? @@ -225,10 +227,16 @@ impl IndexStore { .map_err(|e| { error!("error decoding entry while listing indexes: {}", e); e }).ok()); let mut indexes = Vec::new(); for (name, uuid) in metas { + // get index to retrieve primary key + let (index, _) = self.get_index_txn(&txn, name)? + .with_context(|| format!("could not load index {:?}", name))?; + let primary_key = index.primary_key(&index.read_txn()?)? + .map(String::from); + // retieve meta let meta = self.uuid_to_index_meta .get(&txn, &uuid)? - .unwrap_or_else(|| panic!("corrupted database, index {} should exist.", name)); - indexes.push((name.to_owned(), meta)); + .with_context(|| format!("could not retieve meta for index {:?}", name))?; + indexes.push((name.to_owned(), meta, primary_key)); } Ok(indexes) } diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index 21139e636..9624a9c64 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -5,7 +5,7 @@ mod update_handler; use std::path::Path; use std::sync::Arc; -use anyhow::bail; +use anyhow::{bail, Context}; use itertools::Itertools; use milli::Index; @@ -58,9 +58,24 @@ impl IndexController for LocalIndexController { Ok(pending.into()) } - fn create_index>(&self, index_uid: S) -> anyhow::Result<()> { - self.indexes.create_index(index_uid, self.update_db_size, self.index_db_size)?; - Ok(()) + fn create_index(&self, index_name: impl AsRef, primary_key: Option>) -> anyhow::Result { + let (index, _, meta) = self.indexes.create_index(&index_name, self.update_db_size, self.index_db_size)?; + if let Some(ref primary_key) = primary_key { + if let Err(e) = update_primary_key(index, primary_key).context("error creating index") { + // TODO: creating index could not be completed, delete everything. + Err(e)? + } + } + + let meta = IndexMetadata { + name: index_name.as_ref().to_owned(), + uuid: meta.uuid.clone(), + created_at: meta.created_at, + updated_at: meta.created_at, + primary_key: primary_key.map(|n| n.as_ref().to_owned()), + }; + + Ok(meta) } fn delete_index>(&self, _index_uid: S) -> anyhow::Result<()> { @@ -107,7 +122,7 @@ impl IndexController for LocalIndexController { fn list_indexes(&self) -> anyhow::Result> { let metas = self.indexes.list_indexes()?; let mut output_meta = Vec::new(); - for (name, meta) in metas { + for (name, meta, primary_key) in metas { let created_at = meta.created_at; let uuid = meta.uuid; let updated_at = self @@ -122,7 +137,7 @@ impl IndexController for LocalIndexController { created_at, updated_at, uuid, - primary_key: None, + primary_key, }; output_meta.push(index_meta); } @@ -130,6 +145,17 @@ impl IndexController for LocalIndexController { } } +fn update_primary_key(index: impl AsRef, primary_key: impl AsRef) -> anyhow::Result<()> { + let index = index.as_ref(); + let mut txn = index.write_txn()?; + if index.primary_key(&txn)?.is_some() { + bail!("primary key already set.") + } + index.put_primary_key(&mut txn, primary_key.as_ref())?; + txn.commit()?; + Ok(()) +} + #[cfg(test)] mod test { use super::*; diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 13ecc1486..d5bf873ee 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -127,7 +127,7 @@ pub trait IndexController { fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result; /// Create an index with the given `index_uid`. - fn create_index>(&self, index_uid: S) -> Result<()>; + fn create_index(&self, index_uid: impl AsRef, primary_key: Option>) -> Result; /// Delete index with the given `index_uid`, attempting to close it beforehand. fn delete_index>(&self, index_uid: S) -> Result<()>; diff --git a/src/routes/index.rs b/src/routes/index.rs index 5e2574267..d682376e3 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -54,17 +54,25 @@ async fn get_index( #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] struct IndexCreateRequest { - name: Option, - uid: Option, + name: String, primary_key: Option, } #[post("/indexes", wrap = "Authentication::Private")] async fn create_index( - _data: web::Data, - _body: web::Json, + data: web::Data, + body: web::Json, ) -> Result { - todo!() + match data.create_index(&body.name, body.primary_key.clone()) { + Ok(meta) => { + let json = serde_json::to_string(&meta).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(e) => { + error!("error creating index: {}", e); + unimplemented!() + } + } } #[derive(Debug, Deserialize)] From e89b11b1fa75501f6cd75ac3fda4a45ba0af7e48 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 9 Feb 2021 11:41:26 +0100 Subject: [PATCH 8/8] create IndexSetting struct need to stabilize the create index trait interface --- src/data/mod.rs | 9 +++-- .../local_index_controller/mod.rs | 11 +++--- src/index_controller/mod.rs | 35 +++++++++++++++++-- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/src/data/mod.rs b/src/data/mod.rs index 65364df00..8c512895c 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use sha2::Digest; -use crate::index_controller::{IndexController, LocalIndexController, IndexMetadata, Settings}; +use crate::index_controller::{IndexController, LocalIndexController, IndexMetadata, Settings, IndexSettings}; use crate::option::Opt; #[derive(Clone)] @@ -126,7 +126,12 @@ impl Data { } pub fn create_index(&self, name: impl AsRef, primary_key: Option>) -> anyhow::Result { - let meta = self.index_controller.create_index(name, primary_key)?; + let settings = IndexSettings { + name: Some(name.as_ref().to_string()), + primary_key: primary_key.map(|s| s.as_ref().to_string()), + }; + + let meta = self.index_controller.create_index(settings)?; Ok(meta) } diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index 9624a9c64..60a02573f 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -13,7 +13,7 @@ use crate::option::IndexerOpts; use index_store::IndexStore; use super::IndexController; use super::updates::UpdateStatus; -use super::{UpdateMeta, UpdateResult, IndexMetadata}; +use super::{UpdateMeta, UpdateResult, IndexMetadata, IndexSettings}; pub struct LocalIndexController { indexes: IndexStore, @@ -58,9 +58,10 @@ impl IndexController for LocalIndexController { Ok(pending.into()) } - fn create_index(&self, index_name: impl AsRef, primary_key: Option>) -> anyhow::Result { + fn create_index(&self, index_settings: IndexSettings) -> anyhow::Result { + let index_name = index_settings.name.context("Missing name for index")?; let (index, _, meta) = self.indexes.create_index(&index_name, self.update_db_size, self.index_db_size)?; - if let Some(ref primary_key) = primary_key { + if let Some(ref primary_key) = index_settings.primary_key { if let Err(e) = update_primary_key(index, primary_key).context("error creating index") { // TODO: creating index could not be completed, delete everything. Err(e)? @@ -68,11 +69,11 @@ impl IndexController for LocalIndexController { } let meta = IndexMetadata { - name: index_name.as_ref().to_owned(), + name: index_name, uuid: meta.uuid.clone(), created_at: meta.created_at, updated_at: meta.created_at, - primary_key: primary_key.map(|n| n.as_ref().to_owned()), + primary_key: index_settings.primary_key, }; Ok(meta) diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index d5bf873ee..2c8dd1226 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -97,6 +97,11 @@ pub enum UpdateResult { Other, } +pub struct IndexSettings { + pub name: Option, + pub primary_key: Option, +} + /// The `IndexController` is in charge of the access to the underlying indices. It splits the logic /// for read access which is provided thanks to an handle to the index, and write access which must /// be provided. This allows the implementer to define the behaviour of write accesses to the @@ -127,7 +132,7 @@ pub trait IndexController { fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result; /// Create an index with the given `index_uid`. - fn create_index(&self, index_uid: impl AsRef, primary_key: Option>) -> Result; + fn create_index(&self, index_settings: IndexSettings) -> Result; /// Delete index with the given `index_uid`, attempting to close it beforehand. fn delete_index>(&self, index_uid: S) -> Result<()>; @@ -169,16 +174,40 @@ pub(crate) mod test { fn test_create_and_list_indexes() { crate::index_controller::test::create_and_list_indexes($controller_buider); } + + #[test] + fn test_create_index_with_no_name_is_error() { + crate::index_controller::test::create_index_with_no_name_is_error($controller_buider); + } }; } pub(crate) fn create_and_list_indexes(controller: S) { - controller.create_index("test_index").unwrap(); - controller.create_index("test_index2").unwrap(); + let settings1 = IndexSettings { + name: Some(String::from("test_index")), + primary_key: None, + }; + + let settings2 = IndexSettings { + name: Some(String::from("test_index2")), + primary_key: Some(String::from("foo")), + }; + + controller.create_index(settings1).unwrap(); + controller.create_index(settings2).unwrap(); let indexes = controller.list_indexes().unwrap(); assert_eq!(indexes.len(), 2); assert_eq!(indexes[0].name, "test_index"); assert_eq!(indexes[1].name, "test_index2"); + assert_eq!(indexes[1].primary_key.clone().unwrap(), "foo"); + } + + pub(crate) fn create_index_with_no_name_is_error(controller: S) { + let settings = IndexSettings { + name: None, + primary_key: None, + }; + assert!(controller.create_index(settings).is_err()); } }