mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 23:04:26 +01:00
Merge pull request #41 from meilisearch/update-store-delete-updates
Allow users to abort pending updates
This commit is contained in:
commit
92f253adb2
@ -78,6 +78,12 @@ $(window).on('load', function () {
|
|||||||
const content = $(`#${id} .updateStatus.content`);
|
const content = $(`#${id} .updateStatus.content`);
|
||||||
content.html('processed ' + JSON.stringify(status.meta));
|
content.html('processed ' + JSON.stringify(status.meta));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (status.type == "Aborted") {
|
||||||
|
const id = 'update-' + status.update_id;
|
||||||
|
const content = $(`#${id} .updateStatus.content`);
|
||||||
|
content.html('aborted ' + JSON.stringify(status.meta));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -189,6 +189,18 @@ enum UpdateStatus<M, P, N> {
|
|||||||
Pending { update_id: u64, meta: M },
|
Pending { update_id: u64, meta: M },
|
||||||
Progressing { update_id: u64, meta: P },
|
Progressing { update_id: u64, meta: P },
|
||||||
Processed { update_id: u64, meta: N },
|
Processed { update_id: u64, meta: N },
|
||||||
|
Aborted { update_id: u64, meta: M },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, P, N> UpdateStatus<M, P, N> {
|
||||||
|
fn update_id(&self) -> u64 {
|
||||||
|
match self {
|
||||||
|
UpdateStatus::Pending { update_id, .. } => *update_id,
|
||||||
|
UpdateStatus::Progressing { update_id, .. } => *update_id,
|
||||||
|
UpdateStatus::Processed { update_id, .. } => *update_id,
|
||||||
|
UpdateStatus::Aborted { update_id, .. } => *update_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
@ -473,12 +485,16 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.and(warp::path!("updates"))
|
.and(warp::path!("updates"))
|
||||||
.map(move |header: String| {
|
.map(move |header: String| {
|
||||||
let update_store = update_store_cloned.clone();
|
let update_store = update_store_cloned.clone();
|
||||||
let mut updates = update_store.iter_metas(|processed, pending| {
|
let mut updates = update_store.iter_metas(|processed, aborted, pending| {
|
||||||
let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new();
|
let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new();
|
||||||
for result in processed {
|
for result in processed {
|
||||||
let (uid, meta) = result?;
|
let (uid, meta) = result?;
|
||||||
updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
|
updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
|
||||||
}
|
}
|
||||||
|
for result in aborted {
|
||||||
|
let (uid, meta) = result?;
|
||||||
|
updates.push(UpdateStatus::Aborted { update_id: uid.get(), meta });
|
||||||
|
}
|
||||||
for result in pending {
|
for result in pending {
|
||||||
let (uid, meta) = result?;
|
let (uid, meta) = result?;
|
||||||
updates.push(UpdateStatus::Pending { update_id: uid.get(), meta });
|
updates.push(UpdateStatus::Pending { update_id: uid.get(), meta });
|
||||||
@ -486,9 +502,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
Ok(updates)
|
Ok(updates)
|
||||||
}).unwrap();
|
}).unwrap();
|
||||||
|
|
||||||
if header.contains("text/html") {
|
updates.sort_unstable_by(|s1, s2| s1.update_id().cmp(&s2.update_id()).reverse());
|
||||||
updates.reverse();
|
|
||||||
|
|
||||||
|
if header.contains("text/html") {
|
||||||
// We retrieve the database size.
|
// We retrieve the database size.
|
||||||
let db_size = File::open(lmdb_path_cloned.clone())
|
let db_size = File::open(lmdb_path_cloned.clone())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@ -798,6 +814,31 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
warp::reply()
|
warp::reply()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let update_store_cloned = update_store.clone();
|
||||||
|
let update_status_sender_cloned = update_status_sender.clone();
|
||||||
|
let abort_update_id_route = warp::filters::method::delete()
|
||||||
|
.and(warp::path!("update" / u64))
|
||||||
|
.map(move |update_id: u64| {
|
||||||
|
if let Some(meta) = update_store_cloned.abort_update(update_id).unwrap() {
|
||||||
|
let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta });
|
||||||
|
eprintln!("update {} aborted", update_id);
|
||||||
|
}
|
||||||
|
warp::reply()
|
||||||
|
});
|
||||||
|
|
||||||
|
let update_store_cloned = update_store.clone();
|
||||||
|
let update_status_sender_cloned = update_status_sender.clone();
|
||||||
|
let abort_pending_updates_route = warp::filters::method::delete()
|
||||||
|
.and(warp::path!("updates"))
|
||||||
|
.map(move || {
|
||||||
|
let updates = update_store_cloned.abort_pendings().unwrap();
|
||||||
|
for (update_id, meta) in updates {
|
||||||
|
let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta });
|
||||||
|
eprintln!("update {} aborted", update_id);
|
||||||
|
}
|
||||||
|
warp::reply()
|
||||||
|
});
|
||||||
|
|
||||||
let update_ws_route = warp::ws()
|
let update_ws_route = warp::ws()
|
||||||
.and(warp::path!("updates" / "ws"))
|
.and(warp::path!("updates" / "ws"))
|
||||||
.map(move |ws: warp::ws::Ws| {
|
.map(move |ws: warp::ws::Ws| {
|
||||||
@ -844,6 +885,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.or(indexing_csv_route)
|
.or(indexing_csv_route)
|
||||||
.or(indexing_json_route)
|
.or(indexing_json_route)
|
||||||
.or(indexing_json_stream_route)
|
.or(indexing_json_stream_route)
|
||||||
|
.or(abort_update_id_route)
|
||||||
|
.or(abort_pending_updates_route)
|
||||||
.or(clearing_route)
|
.or(clearing_route)
|
||||||
.or(change_settings_route)
|
.or(change_settings_route)
|
||||||
.or(change_facet_levels_route)
|
.or(change_facet_levels_route)
|
||||||
|
@ -72,6 +72,15 @@
|
|||||||
</li>
|
</li>
|
||||||
</ol>
|
</ol>
|
||||||
</li>
|
</li>
|
||||||
|
{% when UpdateStatus::Aborted with { update_id, meta } %}
|
||||||
|
<li id="update-{{ update_id }}" class="document">
|
||||||
|
<ol>
|
||||||
|
<li class="field">
|
||||||
|
<div class="attribute">update id</div><div class="updateId content">{{ update_id }}</div>
|
||||||
|
<div class="attribute">update status</div><div class="updateStatus content">aborted</div>
|
||||||
|
</li>
|
||||||
|
</ol>
|
||||||
|
</li>
|
||||||
{% else %}
|
{% else %}
|
||||||
{% endmatch %}
|
{% endmatch %}
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
@ -7,6 +7,7 @@ mod index;
|
|||||||
mod mdfs;
|
mod mdfs;
|
||||||
mod query_tokens;
|
mod query_tokens;
|
||||||
mod search;
|
mod search;
|
||||||
|
mod update_store;
|
||||||
pub mod facet;
|
pub mod facet;
|
||||||
pub mod heed_codec;
|
pub mod heed_codec;
|
||||||
pub mod proximity;
|
pub mod proximity;
|
||||||
@ -25,13 +26,11 @@ use serde_json::{Map, Value};
|
|||||||
pub use self::criterion::{Criterion, default_criteria};
|
pub use self::criterion::{Criterion, default_criteria};
|
||||||
pub use self::external_documents_ids::ExternalDocumentsIds;
|
pub use self::external_documents_ids::ExternalDocumentsIds;
|
||||||
pub use self::fields_ids_map::FieldsIdsMap;
|
pub use self::fields_ids_map::FieldsIdsMap;
|
||||||
|
pub use self::heed_codec::{BEU32StrCodec, StrStrU8Codec, ObkvCodec};
|
||||||
|
pub use self::heed_codec::{RoaringBitmapCodec, BoRoaringBitmapCodec, CboRoaringBitmapCodec};
|
||||||
pub use self::index::Index;
|
pub use self::index::Index;
|
||||||
pub use self::search::{Search, FacetCondition, SearchResult};
|
pub use self::search::{Search, FacetCondition, SearchResult};
|
||||||
pub use self::heed_codec::{
|
pub use self::update_store::UpdateStore;
|
||||||
RoaringBitmapCodec, BEU32StrCodec, StrStrU8Codec,
|
|
||||||
ObkvCodec, BoRoaringBitmapCodec, CboRoaringBitmapCodec,
|
|
||||||
};
|
|
||||||
pub use self::update::UpdateStore;
|
|
||||||
|
|
||||||
pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>;
|
pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>;
|
||||||
pub type FastMap8<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher64>>;
|
pub type FastMap8<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher64>>;
|
||||||
|
@ -6,7 +6,6 @@ mod index_documents;
|
|||||||
mod settings;
|
mod settings;
|
||||||
mod update_builder;
|
mod update_builder;
|
||||||
mod update_step;
|
mod update_step;
|
||||||
mod update_store;
|
|
||||||
|
|
||||||
pub use self::available_documents_ids::AvailableDocumentsIds;
|
pub use self::available_documents_ids::AvailableDocumentsIds;
|
||||||
pub use self::clear_documents::ClearDocuments;
|
pub use self::clear_documents::ClearDocuments;
|
||||||
@ -16,4 +15,3 @@ pub use self::facets::Facets;
|
|||||||
pub use self::settings::Settings;
|
pub use self::settings::Settings;
|
||||||
pub use self::update_builder::UpdateBuilder;
|
pub use self::update_builder::UpdateBuilder;
|
||||||
pub use self::update_step::UpdateIndexingStep;
|
pub use self::update_step::UpdateIndexingStep;
|
||||||
pub use self::update_store::UpdateStore;
|
|
||||||
|
@ -14,6 +14,7 @@ pub struct UpdateStore<M, N> {
|
|||||||
pending_meta: Database<OwnedType<BEU64>, SerdeJson<M>>,
|
pending_meta: Database<OwnedType<BEU64>, SerdeJson<M>>,
|
||||||
pending: Database<OwnedType<BEU64>, ByteSlice>,
|
pending: Database<OwnedType<BEU64>, ByteSlice>,
|
||||||
processed_meta: Database<OwnedType<BEU64>, SerdeJson<N>>,
|
processed_meta: Database<OwnedType<BEU64>, SerdeJson<N>>,
|
||||||
|
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<M>>,
|
||||||
notification_sender: Sender<()>,
|
notification_sender: Sender<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -29,11 +30,12 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
|
|||||||
M: for<'a> Deserialize<'a>,
|
M: for<'a> Deserialize<'a>,
|
||||||
N: Serialize,
|
N: Serialize,
|
||||||
{
|
{
|
||||||
options.max_dbs(3);
|
options.max_dbs(4);
|
||||||
let env = options.open(path)?;
|
let env = options.open(path)?;
|
||||||
let pending_meta = env.create_database(Some("pending-meta"))?;
|
let pending_meta = env.create_database(Some("pending-meta"))?;
|
||||||
let pending = env.create_database(Some("pending"))?;
|
let pending = env.create_database(Some("pending"))?;
|
||||||
let processed_meta = env.create_database(Some("processed-meta"))?;
|
let processed_meta = env.create_database(Some("processed-meta"))?;
|
||||||
|
let aborted_meta = env.create_database(Some("aborted-meta"))?;
|
||||||
|
|
||||||
let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1);
|
let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1);
|
||||||
// Send a first notification to trigger the process.
|
// Send a first notification to trigger the process.
|
||||||
@ -44,6 +46,7 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
|
|||||||
pending,
|
pending,
|
||||||
pending_meta,
|
pending_meta,
|
||||||
processed_meta,
|
processed_meta,
|
||||||
|
aborted_meta,
|
||||||
notification_sender,
|
notification_sender,
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -67,20 +70,27 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
|
|||||||
/// Returns the new biggest id to use to store the new update.
|
/// Returns the new biggest id to use to store the new update.
|
||||||
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
|
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
|
||||||
let last_pending = self.pending_meta
|
let last_pending = self.pending_meta
|
||||||
.as_polymorph()
|
.remap_data_type::<DecodeIgnore>()
|
||||||
.last::<_, OwnedType<BEU64>, DecodeIgnore>(txn)?
|
.last(txn)?
|
||||||
.map(|(k, _)| k.get());
|
.map(|(k, _)| k.get());
|
||||||
|
|
||||||
if let Some(last_id) = last_pending {
|
|
||||||
return Ok(last_id + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
let last_processed = self.processed_meta
|
let last_processed = self.processed_meta
|
||||||
.as_polymorph()
|
.remap_data_type::<DecodeIgnore>()
|
||||||
.last::<_, OwnedType<BEU64>, DecodeIgnore>(txn)?
|
.last(txn)?
|
||||||
.map(|(k, _)| k.get());
|
.map(|(k, _)| k.get());
|
||||||
|
|
||||||
match last_processed {
|
let last_aborted = self.aborted_meta
|
||||||
|
.remap_data_type::<DecodeIgnore>()
|
||||||
|
.last(txn)?
|
||||||
|
.map(|(k, _)| k.get());
|
||||||
|
|
||||||
|
let last_update_id = [last_pending, last_processed, last_aborted]
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.flatten()
|
||||||
|
.max();
|
||||||
|
|
||||||
|
match last_update_id {
|
||||||
Some(last_id) => Ok(last_id + 1),
|
Some(last_id) => Ok(last_id + 1),
|
||||||
None => Ok(0),
|
None => Ok(0),
|
||||||
}
|
}
|
||||||
@ -152,8 +162,21 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute the user defined function with both meta-store iterators, the first
|
/// The id and metadata of the update that is currently being processed,
|
||||||
/// iterator is the *processed* meta one and the secind is the *pending* meta one.
|
/// `None` if no update is being processed.
|
||||||
|
pub fn processing_update(&self) -> heed::Result<Option<(u64, M)>>
|
||||||
|
where M: for<'a> Deserialize<'a>,
|
||||||
|
{
|
||||||
|
let rtxn = self.env.read_txn()?;
|
||||||
|
match self.pending_meta.first(&rtxn)? {
|
||||||
|
Some((key, meta)) => Ok(Some((key.get(), meta))),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Execute the user defined function with the meta-store iterators, the first
|
||||||
|
/// iterator is the *processed* meta one, the second the *aborted* meta one
|
||||||
|
/// and, the last is the *pending* meta one.
|
||||||
pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T>
|
pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T>
|
||||||
where
|
where
|
||||||
M: for<'a> Deserialize<'a>,
|
M: for<'a> Deserialize<'a>,
|
||||||
@ -161,19 +184,21 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
|
|||||||
F: for<'a> FnMut(
|
F: for<'a> FnMut(
|
||||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<N>>,
|
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<N>>,
|
||||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<M>>,
|
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<M>>,
|
||||||
|
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<M>>,
|
||||||
) -> heed::Result<T>,
|
) -> heed::Result<T>,
|
||||||
{
|
{
|
||||||
let rtxn = self.env.read_txn()?;
|
let rtxn = self.env.read_txn()?;
|
||||||
|
|
||||||
// We get both the pending and processed meta iterators.
|
// We get the pending, processed and aborted meta iterators.
|
||||||
let processed_iter = self.processed_meta.iter(&rtxn)?;
|
let processed_iter = self.processed_meta.iter(&rtxn)?;
|
||||||
|
let aborted_iter = self.aborted_meta.iter(&rtxn)?;
|
||||||
let pending_iter = self.pending_meta.iter(&rtxn)?;
|
let pending_iter = self.pending_meta.iter(&rtxn)?;
|
||||||
|
|
||||||
// We execute the user defined function with both iterators.
|
// We execute the user defined function with both iterators.
|
||||||
(f)(processed_iter, pending_iter)
|
(f)(processed_iter, aborted_iter, pending_iter)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the update associated meta or `None` if the update deosn't exist.
|
/// Returns the update associated meta or `None` if the update doesn't exist.
|
||||||
pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatusMeta<M, N>>>
|
pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatusMeta<M, N>>>
|
||||||
where
|
where
|
||||||
M: for<'a> Deserialize<'a>,
|
M: for<'a> Deserialize<'a>,
|
||||||
@ -186,10 +211,73 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
|
|||||||
return Ok(Some(UpdateStatusMeta::Pending(meta)));
|
return Ok(Some(UpdateStatusMeta::Pending(meta)));
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.processed_meta.get(&rtxn, &key)? {
|
if let Some(meta) = self.processed_meta.get(&rtxn, &key)? {
|
||||||
Some(meta) => Ok(Some(UpdateStatusMeta::Processed(meta))),
|
return Ok(Some(UpdateStatusMeta::Processed(meta)));
|
||||||
None => Ok(None),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? {
|
||||||
|
return Ok(Some(UpdateStatusMeta::Aborted(meta)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Aborts an update, an aborted update content is deleted and
|
||||||
|
/// the meta of it is moved into the aborted updates database.
|
||||||
|
///
|
||||||
|
/// Trying to abort an update that is currently being processed, an update
|
||||||
|
/// that as already been processed or which doesn't actually exist, will
|
||||||
|
/// return `None`.
|
||||||
|
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<M>>
|
||||||
|
where M: Serialize + for<'a> Deserialize<'a>,
|
||||||
|
{
|
||||||
|
let mut wtxn = self.env.write_txn()?;
|
||||||
|
let key = BEU64::new(update_id);
|
||||||
|
|
||||||
|
// We cannot abort an update that is currently being processed.
|
||||||
|
if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let meta = match self.pending_meta.get(&wtxn, &key)? {
|
||||||
|
Some(meta) => meta,
|
||||||
|
None => return Ok(None),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.aborted_meta.put(&mut wtxn, &key, &meta)?;
|
||||||
|
self.pending_meta.delete(&mut wtxn, &key)?;
|
||||||
|
self.pending.delete(&mut wtxn, &key)?;
|
||||||
|
|
||||||
|
wtxn.commit()?;
|
||||||
|
|
||||||
|
Ok(Some(meta))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Aborts all the pending updates, and not the one being currently processed.
|
||||||
|
/// Returns the update metas and ids that were successfully aborted.
|
||||||
|
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, M)>>
|
||||||
|
where M: Serialize + for<'a> Deserialize<'a>,
|
||||||
|
{
|
||||||
|
let mut wtxn = self.env.write_txn()?;
|
||||||
|
let mut aborted_updates = Vec::new();
|
||||||
|
|
||||||
|
// We skip the first pending update as it is currently being processed.
|
||||||
|
for result in self.pending_meta.iter(&wtxn)?.skip(1) {
|
||||||
|
let (key, meta) = result?;
|
||||||
|
let id = key.get();
|
||||||
|
aborted_updates.push((id, meta));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (id, meta) in &aborted_updates {
|
||||||
|
let key = BEU64::new(*id);
|
||||||
|
self.aborted_meta.put(&mut wtxn, &key, &meta)?;
|
||||||
|
self.pending_meta.delete(&mut wtxn, &key)?;
|
||||||
|
self.pending.delete(&mut wtxn, &key)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
wtxn.commit()?;
|
||||||
|
|
||||||
|
Ok(aborted_updates)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,6 +285,7 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
|
|||||||
pub enum UpdateStatusMeta<M, N> {
|
pub enum UpdateStatusMeta<M, N> {
|
||||||
Pending(M),
|
Pending(M),
|
||||||
Processed(N),
|
Processed(N),
|
||||||
|
Aborted(M),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
Loading…
Reference in New Issue
Block a user