diff --git a/http-ui/public/updates-script.js b/http-ui/public/updates-script.js index 5d439a7f5..bb91de313 100644 --- a/http-ui/public/updates-script.js +++ b/http-ui/public/updates-script.js @@ -78,6 +78,12 @@ $(window).on('load', function () { const content = $(`#${id} .updateStatus.content`); content.html('processed ' + JSON.stringify(status.meta)); } + + if (status.type == "Aborted") { + const id = 'update-' + status.update_id; + const content = $(`#${id} .updateStatus.content`); + content.html('aborted ' + JSON.stringify(status.meta)); + } } }); diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 80402f0a0..62d3d75bd 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -189,6 +189,18 @@ enum UpdateStatus { Pending { update_id: u64, meta: M }, Progressing { update_id: u64, meta: P }, Processed { update_id: u64, meta: N }, + Aborted { update_id: u64, meta: M }, +} + +impl UpdateStatus { + fn update_id(&self) -> u64 { + match self { + UpdateStatus::Pending { update_id, .. } => *update_id, + UpdateStatus::Progressing { update_id, .. } => *update_id, + UpdateStatus::Processed { update_id, .. } => *update_id, + UpdateStatus::Aborted { update_id, .. } => *update_id, + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -473,12 +485,16 @@ async fn main() -> anyhow::Result<()> { .and(warp::path!("updates")) .map(move |header: String| { let update_store = update_store_cloned.clone(); - let mut updates = update_store.iter_metas(|processed, pending| { + let mut updates = update_store.iter_metas(|processed, aborted, pending| { let mut updates = Vec::>::new(); for result in processed { let (uid, meta) = result?; updates.push(UpdateStatus::Processed { update_id: uid.get(), meta }); } + for result in aborted { + let (uid, meta) = result?; + updates.push(UpdateStatus::Aborted { update_id: uid.get(), meta }); + } for result in pending { let (uid, meta) = result?; updates.push(UpdateStatus::Pending { update_id: uid.get(), meta }); @@ -486,9 +502,9 @@ async fn main() -> anyhow::Result<()> { Ok(updates) }).unwrap(); - if header.contains("text/html") { - updates.reverse(); + updates.sort_unstable_by(|s1, s2| s1.update_id().cmp(&s2.update_id()).reverse()); + if header.contains("text/html") { // We retrieve the database size. let db_size = File::open(lmdb_path_cloned.clone()) .unwrap() @@ -798,6 +814,31 @@ async fn main() -> anyhow::Result<()> { warp::reply() }); + let update_store_cloned = update_store.clone(); + let update_status_sender_cloned = update_status_sender.clone(); + let abort_update_id_route = warp::filters::method::delete() + .and(warp::path!("update" / u64)) + .map(move |update_id: u64| { + if let Some(meta) = update_store_cloned.abort_update(update_id).unwrap() { + let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta }); + eprintln!("update {} aborted", update_id); + } + warp::reply() + }); + + let update_store_cloned = update_store.clone(); + let update_status_sender_cloned = update_status_sender.clone(); + let abort_pending_updates_route = warp::filters::method::delete() + .and(warp::path!("updates")) + .map(move || { + let updates = update_store_cloned.abort_pendings().unwrap(); + for (update_id, meta) in updates { + let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta }); + eprintln!("update {} aborted", update_id); + } + warp::reply() + }); + let update_ws_route = warp::ws() .and(warp::path!("updates" / "ws")) .map(move |ws: warp::ws::Ws| { @@ -844,6 +885,8 @@ async fn main() -> anyhow::Result<()> { .or(indexing_csv_route) .or(indexing_json_route) .or(indexing_json_stream_route) + .or(abort_update_id_route) + .or(abort_pending_updates_route) .or(clearing_route) .or(change_settings_route) .or(change_facet_levels_route) diff --git a/http-ui/templates/updates.html b/http-ui/templates/updates.html index 271394c92..514a006b3 100644 --- a/http-ui/templates/updates.html +++ b/http-ui/templates/updates.html @@ -72,6 +72,15 @@ + {% when UpdateStatus::Aborted with { update_id, meta } %} +
  • +
      +
    1. +
      update id
      {{ update_id }}
      +
      update status
      aborted
      +
    2. +
    +
  • {% else %} {% endmatch %} {% endfor %} diff --git a/src/lib.rs b/src/lib.rs index 320077b86..93f9cc0df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ mod index; mod mdfs; mod query_tokens; mod search; +mod update_store; pub mod facet; pub mod heed_codec; pub mod proximity; @@ -25,13 +26,11 @@ use serde_json::{Map, Value}; pub use self::criterion::{Criterion, default_criteria}; pub use self::external_documents_ids::ExternalDocumentsIds; pub use self::fields_ids_map::FieldsIdsMap; +pub use self::heed_codec::{BEU32StrCodec, StrStrU8Codec, ObkvCodec}; +pub use self::heed_codec::{RoaringBitmapCodec, BoRoaringBitmapCodec, CboRoaringBitmapCodec}; pub use self::index::Index; pub use self::search::{Search, FacetCondition, SearchResult}; -pub use self::heed_codec::{ - RoaringBitmapCodec, BEU32StrCodec, StrStrU8Codec, - ObkvCodec, BoRoaringBitmapCodec, CboRoaringBitmapCodec, -}; -pub use self::update::UpdateStore; +pub use self::update_store::UpdateStore; pub type FastMap4 = HashMap>; pub type FastMap8 = HashMap>; diff --git a/src/update/mod.rs b/src/update/mod.rs index d05396f00..407d9f498 100644 --- a/src/update/mod.rs +++ b/src/update/mod.rs @@ -6,7 +6,6 @@ mod index_documents; mod settings; mod update_builder; mod update_step; -mod update_store; pub use self::available_documents_ids::AvailableDocumentsIds; pub use self::clear_documents::ClearDocuments; @@ -16,4 +15,3 @@ pub use self::facets::Facets; pub use self::settings::Settings; pub use self::update_builder::UpdateBuilder; pub use self::update_step::UpdateIndexingStep; -pub use self::update_store::UpdateStore; diff --git a/src/update/update_store.rs b/src/update_store.rs similarity index 69% rename from src/update/update_store.rs rename to src/update_store.rs index de07f1e21..9ffa2a7bc 100644 --- a/src/update/update_store.rs +++ b/src/update_store.rs @@ -14,6 +14,7 @@ pub struct UpdateStore { pending_meta: Database, SerdeJson>, pending: Database, ByteSlice>, processed_meta: Database, SerdeJson>, + aborted_meta: Database, SerdeJson>, notification_sender: Sender<()>, } @@ -29,11 +30,12 @@ impl UpdateStore { M: for<'a> Deserialize<'a>, N: Serialize, { - options.max_dbs(3); + options.max_dbs(4); let env = options.open(path)?; let pending_meta = env.create_database(Some("pending-meta"))?; let pending = env.create_database(Some("pending"))?; let processed_meta = env.create_database(Some("processed-meta"))?; + let aborted_meta = env.create_database(Some("aborted-meta"))?; let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1); // Send a first notification to trigger the process. @@ -44,6 +46,7 @@ impl UpdateStore { pending, pending_meta, processed_meta, + aborted_meta, notification_sender, }); @@ -67,20 +70,27 @@ impl UpdateStore { /// Returns the new biggest id to use to store the new update. fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { let last_pending = self.pending_meta - .as_polymorph() - .last::<_, OwnedType, DecodeIgnore>(txn)? + .remap_data_type::() + .last(txn)? .map(|(k, _)| k.get()); - if let Some(last_id) = last_pending { - return Ok(last_id + 1); - } - let last_processed = self.processed_meta - .as_polymorph() - .last::<_, OwnedType, DecodeIgnore>(txn)? + .remap_data_type::() + .last(txn)? .map(|(k, _)| k.get()); - match last_processed { + let last_aborted = self.aborted_meta + .remap_data_type::() + .last(txn)? + .map(|(k, _)| k.get()); + + let last_update_id = [last_pending, last_processed, last_aborted] + .iter() + .copied() + .flatten() + .max(); + + match last_update_id { Some(last_id) => Ok(last_id + 1), None => Ok(0), } @@ -152,8 +162,21 @@ impl UpdateStore { } } - /// Execute the user defined function with both meta-store iterators, the first - /// iterator is the *processed* meta one and the secind is the *pending* meta one. + /// The id and metadata of the update that is currently being processed, + /// `None` if no update is being processed. + pub fn processing_update(&self) -> heed::Result> + where M: for<'a> Deserialize<'a>, + { + let rtxn = self.env.read_txn()?; + match self.pending_meta.first(&rtxn)? { + Some((key, meta)) => Ok(Some((key.get(), meta))), + None => Ok(None), + } + } + + /// Execute the user defined function with the meta-store iterators, the first + /// iterator is the *processed* meta one, the second the *aborted* meta one + /// and, the last is the *pending* meta one. pub fn iter_metas(&self, mut f: F) -> heed::Result where M: for<'a> Deserialize<'a>, @@ -161,19 +184,21 @@ impl UpdateStore { F: for<'a> FnMut( heed::RoIter<'a, OwnedType, SerdeJson>, heed::RoIter<'a, OwnedType, SerdeJson>, + heed::RoIter<'a, OwnedType, SerdeJson>, ) -> heed::Result, { let rtxn = self.env.read_txn()?; - // We get both the pending and processed meta iterators. + // We get the pending, processed and aborted meta iterators. let processed_iter = self.processed_meta.iter(&rtxn)?; + let aborted_iter = self.aborted_meta.iter(&rtxn)?; let pending_iter = self.pending_meta.iter(&rtxn)?; // We execute the user defined function with both iterators. - (f)(processed_iter, pending_iter) + (f)(processed_iter, aborted_iter, pending_iter) } - /// Returns the update associated meta or `None` if the update deosn't exist. + /// Returns the update associated meta or `None` if the update doesn't exist. pub fn meta(&self, update_id: u64) -> heed::Result>> where M: for<'a> Deserialize<'a>, @@ -186,10 +211,73 @@ impl UpdateStore { return Ok(Some(UpdateStatusMeta::Pending(meta))); } - match self.processed_meta.get(&rtxn, &key)? { - Some(meta) => Ok(Some(UpdateStatusMeta::Processed(meta))), - None => Ok(None), + if let Some(meta) = self.processed_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatusMeta::Processed(meta))); } + + if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? { + return Ok(Some(UpdateStatusMeta::Aborted(meta))); + } + + Ok(None) + } + + /// Aborts an update, an aborted update content is deleted and + /// the meta of it is moved into the aborted updates database. + /// + /// Trying to abort an update that is currently being processed, an update + /// that as already been processed or which doesn't actually exist, will + /// return `None`. + pub fn abort_update(&self, update_id: u64) -> heed::Result> + where M: Serialize + for<'a> Deserialize<'a>, + { + let mut wtxn = self.env.write_txn()?; + let key = BEU64::new(update_id); + + // We cannot abort an update that is currently being processed. + if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) { + return Ok(None); + } + + let meta = match self.pending_meta.get(&wtxn, &key)? { + Some(meta) => meta, + None => return Ok(None), + }; + + self.aborted_meta.put(&mut wtxn, &key, &meta)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + + wtxn.commit()?; + + Ok(Some(meta)) + } + + /// Aborts all the pending updates, and not the one being currently processed. + /// Returns the update metas and ids that were successfully aborted. + pub fn abort_pendings(&self) -> heed::Result> + where M: Serialize + for<'a> Deserialize<'a>, + { + let mut wtxn = self.env.write_txn()?; + let mut aborted_updates = Vec::new(); + + // We skip the first pending update as it is currently being processed. + for result in self.pending_meta.iter(&wtxn)?.skip(1) { + let (key, meta) = result?; + let id = key.get(); + aborted_updates.push((id, meta)); + } + + for (id, meta) in &aborted_updates { + let key = BEU64::new(*id); + self.aborted_meta.put(&mut wtxn, &key, &meta)?; + self.pending_meta.delete(&mut wtxn, &key)?; + self.pending.delete(&mut wtxn, &key)?; + } + + wtxn.commit()?; + + Ok(aborted_updates) } } @@ -197,6 +285,7 @@ impl UpdateStore { pub enum UpdateStatusMeta { Pending(M), Processed(N), + Aborted(M), } #[cfg(test)]