From ec047eefd2abec39c4d4c3c17c8da4c87c1b917f Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 8 Feb 2021 10:47:34 +0100 Subject: [PATCH] implement create index --- src/data/mod.rs | 5 +++ .../local_index_controller/index_store.rs | 28 +++++++++----- .../local_index_controller/mod.rs | 38 ++++++++++++++++--- src/index_controller/mod.rs | 2 +- src/routes/index.rs | 18 ++++++--- 5 files changed, 69 insertions(+), 22 deletions(-) diff --git a/src/data/mod.rs b/src/data/mod.rs index 61945ee2a..65364df00 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -125,6 +125,11 @@ impl Data { .find(|i| i.name == name.as_ref())) } + pub fn create_index(&self, name: impl AsRef, primary_key: Option>) -> anyhow::Result { + let meta = self.index_controller.create_index(name, primary_key)?; + Ok(meta) + } + #[inline] pub fn http_payload_size_limit(&self) -> usize { self.options.http_payload_size_limit.get_bytes() as usize diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 937a0cac0..d28228efd 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -2,7 +2,7 @@ use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use anyhow::bail; +use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use dashmap::{DashMap, mapref::entry::Entry}; use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; @@ -142,14 +142,14 @@ impl IndexStore { Some(res) => Ok(res), None => { let uuid = Uuid::new_v4(); - let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; + let (index, updates, _) = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; // If we fail to commit the transaction, we must delete the database from the // file-system. if let Err(e) = txn.commit() { self.clean_db(uuid); return Err(e)?; } - Ok(result) + Ok((index, updates)) }, } } @@ -171,7 +171,7 @@ impl IndexStore { name: impl AsRef, update_store_size: u64, index_store_size: u64, - ) -> anyhow::Result<(Arc, Arc)> { + ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { let created_at = Utc::now(); let meta = IndexMeta { update_store_size, index_store_size, uuid: uuid.clone(), created_at }; @@ -189,7 +189,7 @@ impl IndexStore { self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone())); - Ok((index, update_store)) + Ok((index, update_store, meta)) } /// Same as `get_or_create`, but returns an error if the index already exists. @@ -198,7 +198,7 @@ impl IndexStore { name: impl AsRef, update_size: u64, index_size: u64, - ) -> anyhow::Result<(Arc, Arc)> { + ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { let uuid = Uuid::new_v4(); let mut txn = self.env.write_txn()?; @@ -216,8 +216,10 @@ impl IndexStore { Ok(result) } - /// Returns each index associated with it's metadata; - pub fn list_indexes(&self) -> anyhow::Result> { + /// Returns each index associated with its metadata: + /// (index_name, IndexMeta, primary_key) + /// This method will force all the indexes to be loaded. + pub fn list_indexes(&self) -> anyhow::Result)>> { let txn = self.env.read_txn()?; let metas = self.name_to_uuid .iter(&txn)? @@ -225,10 +227,16 @@ impl IndexStore { .map_err(|e| { error!("error decoding entry while listing indexes: {}", e); e }).ok()); let mut indexes = Vec::new(); for (name, uuid) in metas { + // get index to retrieve primary key + let (index, _) = self.get_index_txn(&txn, name)? + .with_context(|| format!("could not load index {:?}", name))?; + let primary_key = index.primary_key(&index.read_txn()?)? + .map(String::from); + // retieve meta let meta = self.uuid_to_index_meta .get(&txn, &uuid)? - .unwrap_or_else(|| panic!("corrupted database, index {} should exist.", name)); - indexes.push((name.to_owned(), meta)); + .with_context(|| format!("could not retieve meta for index {:?}", name))?; + indexes.push((name.to_owned(), meta, primary_key)); } Ok(indexes) } diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index 21139e636..9624a9c64 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -5,7 +5,7 @@ mod update_handler; use std::path::Path; use std::sync::Arc; -use anyhow::bail; +use anyhow::{bail, Context}; use itertools::Itertools; use milli::Index; @@ -58,9 +58,24 @@ impl IndexController for LocalIndexController { Ok(pending.into()) } - fn create_index>(&self, index_uid: S) -> anyhow::Result<()> { - self.indexes.create_index(index_uid, self.update_db_size, self.index_db_size)?; - Ok(()) + fn create_index(&self, index_name: impl AsRef, primary_key: Option>) -> anyhow::Result { + let (index, _, meta) = self.indexes.create_index(&index_name, self.update_db_size, self.index_db_size)?; + if let Some(ref primary_key) = primary_key { + if let Err(e) = update_primary_key(index, primary_key).context("error creating index") { + // TODO: creating index could not be completed, delete everything. + Err(e)? + } + } + + let meta = IndexMetadata { + name: index_name.as_ref().to_owned(), + uuid: meta.uuid.clone(), + created_at: meta.created_at, + updated_at: meta.created_at, + primary_key: primary_key.map(|n| n.as_ref().to_owned()), + }; + + Ok(meta) } fn delete_index>(&self, _index_uid: S) -> anyhow::Result<()> { @@ -107,7 +122,7 @@ impl IndexController for LocalIndexController { fn list_indexes(&self) -> anyhow::Result> { let metas = self.indexes.list_indexes()?; let mut output_meta = Vec::new(); - for (name, meta) in metas { + for (name, meta, primary_key) in metas { let created_at = meta.created_at; let uuid = meta.uuid; let updated_at = self @@ -122,7 +137,7 @@ impl IndexController for LocalIndexController { created_at, updated_at, uuid, - primary_key: None, + primary_key, }; output_meta.push(index_meta); } @@ -130,6 +145,17 @@ impl IndexController for LocalIndexController { } } +fn update_primary_key(index: impl AsRef, primary_key: impl AsRef) -> anyhow::Result<()> { + let index = index.as_ref(); + let mut txn = index.write_txn()?; + if index.primary_key(&txn)?.is_some() { + bail!("primary key already set.") + } + index.put_primary_key(&mut txn, primary_key.as_ref())?; + txn.commit()?; + Ok(()) +} + #[cfg(test)] mod test { use super::*; diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 13ecc1486..d5bf873ee 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -127,7 +127,7 @@ pub trait IndexController { fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result; /// Create an index with the given `index_uid`. - fn create_index>(&self, index_uid: S) -> Result<()>; + fn create_index(&self, index_uid: impl AsRef, primary_key: Option>) -> Result; /// Delete index with the given `index_uid`, attempting to close it beforehand. fn delete_index>(&self, index_uid: S) -> Result<()>; diff --git a/src/routes/index.rs b/src/routes/index.rs index 5e2574267..d682376e3 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -54,17 +54,25 @@ async fn get_index( #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] struct IndexCreateRequest { - name: Option, - uid: Option, + name: String, primary_key: Option, } #[post("/indexes", wrap = "Authentication::Private")] async fn create_index( - _data: web::Data, - _body: web::Json, + data: web::Data, + body: web::Json, ) -> Result { - todo!() + match data.create_index(&body.name, body.primary_key.clone()) { + Ok(meta) => { + let json = serde_json::to_string(&meta).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(e) => { + error!("error creating index: {}", e); + unimplemented!() + } + } } #[derive(Debug, Deserialize)]