diff --git a/src/data/updates.rs b/src/data/updates.rs index 06aed8faa..57e9d1864 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -6,7 +6,7 @@ use futures_util::stream::StreamExt; use tokio::io::AsyncWriteExt; use super::Data; -use crate::index_controller::{IndexController, Settings}; +use crate::index_controller::{IndexController, Settings, IndexSettings, IndexMetadata}; use crate::index_controller::UpdateStatus; impl Data { @@ -70,4 +70,18 @@ impl Data { pub fn get_updates_status(&self, index: impl AsRef) -> anyhow::Result> { self.index_controller.all_update_status(index) } + + pub fn update_index( + &self, + name: impl AsRef, + primary_key: Option>, + new_name: Option> + ) -> anyhow::Result { + let settings = IndexSettings { + name: new_name.map(|s| s.as_ref().to_string()), + primary_key: primary_key.map(|s| s.as_ref().to_string()), + }; + + self.index_controller.update_index(name, settings) + } } diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index d28228efd..36c864772 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -24,6 +24,7 @@ pub struct IndexMeta { index_store_size: u64, pub uuid: Uuid, pub created_at: DateTime, + pub updated_at: DateTime, } impl IndexMeta { @@ -131,6 +132,52 @@ impl IndexStore { self.get_index_txn(&txn, name) } + /// Use this function to perform an update on an index. + /// This function also puts a lock on what index in allowed to perform an update. + pub fn update_index(&self, name: impl AsRef, f: F) -> anyhow::Result<(T, IndexMeta)> + where + F: FnOnce(&Index) -> anyhow::Result, + { + let mut txn = self.env.write_txn()?; + let (index, _) = self.get_index_txn(&txn, &name)? + .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; + let result = f(index.as_ref()); + match result { + Ok(ret) => { + let meta = self.update_meta(&mut txn, name, |meta| meta.updated_at = Utc::now())?; + txn.commit()?; + Ok((ret, meta)) + } + Err(e) => Err(e) + } + } + + pub fn index_with_meta(&self, name: impl AsRef) -> anyhow::Result, IndexMeta)>> { + let txn = self.env.read_txn()?; + let uuid = self.index_uuid(&txn, &name)?; + match uuid { + Some(uuid) => { + let meta = self.uuid_to_index_meta.get(&txn, uuid.as_bytes())? + .with_context(|| format!("unable to retrieve metadata for index {:?}", name.as_ref()))?; + let (index, _) = self.retrieve_index(&txn, uuid)? + .with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?; + Ok(Some((index, meta))) + } + None => Ok(None), + } + } + + fn update_meta(&self, txn: &mut RwTxn, name: impl AsRef, f: impl FnOnce(&mut IndexMeta)) -> anyhow::Result { + let uuid = self.index_uuid(txn, &name)? + .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; + let mut meta = self.uuid_to_index_meta + .get(txn, uuid.as_bytes())? + .with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?; + f(&mut meta); + self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; + Ok(meta) + } + pub fn get_or_create_index( &self, name: impl AsRef, @@ -173,7 +220,14 @@ impl IndexStore { index_store_size: u64, ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { let created_at = Utc::now(); - let meta = IndexMeta { update_store_size, index_store_size, uuid: uuid.clone(), created_at }; + let updated_at = created_at; + let meta = IndexMeta { + update_store_size, + index_store_size, + uuid: uuid.clone(), + created_at, + updated_at, + }; self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?; self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; @@ -318,11 +372,15 @@ mod test { let txn = store.env.read_txn().unwrap(); assert!(store.retrieve_index(&txn, uuid).unwrap().is_none()); + let created_at = Utc::now(); + let updated_at = created_at; + let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone(), - created_at: Utc::now(), + created_at, + updated_at, }; let mut txn = store.env.write_txn().unwrap(); store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); @@ -344,12 +402,16 @@ mod test { assert!(store.index(&name).unwrap().is_none()); + let created_at = Utc::now(); + let updated_at = created_at; + let uuid = Uuid::new_v4(); let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone(), - created_at: Utc::now(), + created_at, + updated_at, }; let mut txn = store.env.write_txn().unwrap(); store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index 60a02573f..62055f1a8 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -144,6 +144,44 @@ impl IndexController for LocalIndexController { } Ok(output_meta) } + + fn update_index(&self, name: impl AsRef, index_settings: IndexSettings) -> anyhow::Result { + if index_settings.name.is_some() { + bail!("can't udpate an index name.") + } + + let (primary_key, meta) = match index_settings.primary_key { + Some(ref primary_key) => { + self.indexes + .update_index(&name, |index| { + let mut txn = index.write_txn()?; + if index.primary_key(&txn)?.is_some() { + bail!("primary key already exists.") + } + index.put_primary_key(&mut txn, primary_key)?; + txn.commit()?; + Ok(Some(primary_key.clone())) + })? + }, + None => { + let (index, meta) = self.indexes + .index_with_meta(&name)? + .with_context(|| format!("index {:?} doesn't exist.", name.as_ref()))?; + let primary_key = index + .primary_key(&index.read_txn()?)? + .map(String::from); + (primary_key, meta) + }, + }; + + Ok(IndexMetadata { + name: name.as_ref().to_string(), + uuid: meta.uuid.clone(), + created_at: meta.created_at, + updated_at: meta.updated_at, + primary_key, + }) + } } fn update_primary_key(index: impl AsRef, primary_key: impl AsRef) -> anyhow::Result<()> { diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 2c8dd1226..a0f8476ff 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -10,9 +10,7 @@ use std::sync::Arc; use anyhow::Result; use chrono::{DateTime, Utc}; use milli::Index; -use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; -use serde::{Serialize, Deserialize, de::Deserializer}; -use uuid::Uuid; +use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; use uuid::Uuid; pub use updates::{Processed, Processing, Failed}; @@ -155,10 +153,16 @@ pub trait IndexController { /// Returns, if it exists, the `Index` with the povided name. fn index(&self, name: impl AsRef) -> anyhow::Result>>; + /// Returns the udpate status an update fn update_status(&self, index: impl AsRef, id: u64) -> anyhow::Result>; + + /// Returns all the udpate status for an index fn all_update_status(&self, index: impl AsRef) -> anyhow::Result>; + /// List all the indexes fn list_indexes(&self) -> anyhow::Result>; + + fn update_index(&self, name: impl AsRef, index_settings: IndexSettings) -> anyhow::Result; } diff --git a/src/routes/index.rs b/src/routes/index.rs index d682376e3..395811774 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -94,11 +94,17 @@ struct UpdateIndexResponse { #[put("/indexes/{index_uid}", wrap = "Authentication::Private")] async fn update_index( - _data: web::Data, - _path: web::Path, - _body: web::Json, + data: web::Data, + path: web::Path, + body: web::Json, ) -> Result { - todo!() + match data.update_index(&path.index_uid, body.primary_key.as_ref(), body.name.as_ref()) { + Ok(meta) => { + let json = serde_json::to_string(&meta).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(_) => { todo!() } + } } #[delete("/indexes/{index_uid}", wrap = "Authentication::Private")]