Discover and remove documents ids

This commit is contained in:
Kerollmops 2020-05-19 13:12:02 +02:00
parent 5bf15a4190
commit 3bca31856d
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
7 changed files with 82 additions and 33 deletions

View File

@ -775,12 +775,12 @@ mod tests {
assert!(document.is_none()); assert!(document.is_none());
let document: Option<IgnoredAny> = index let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(7_900_334_843_754_999_545)) .document(&reader, None, DocumentId(0))
.unwrap(); .unwrap();
assert!(document.is_some()); assert!(document.is_some());
let document: Option<IgnoredAny> = index let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(8_367_468_610_878_465_872)) .document(&reader, None, DocumentId(1))
.unwrap(); .unwrap();
assert!(document.is_some()); assert!(document.is_some());
} }
@ -855,12 +855,12 @@ mod tests {
assert!(document.is_none()); assert!(document.is_none());
let document: Option<IgnoredAny> = index let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(7_900_334_843_754_999_545)) .document(&reader, None, DocumentId(0))
.unwrap(); .unwrap();
assert!(document.is_some()); assert!(document.is_some());
let document: Option<IgnoredAny> = index let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(8_367_468_610_878_465_872)) .document(&reader, None, DocumentId(1))
.unwrap(); .unwrap();
assert!(document.is_some()); assert!(document.is_some());
@ -897,7 +897,7 @@ mod tests {
let reader = db.main_read_txn().unwrap(); let reader = db.main_read_txn().unwrap();
let document: Option<serde_json::Value> = index let document: Option<serde_json::Value> = index
.document(&reader, None, DocumentId(7_900_334_843_754_999_545)) .document(&reader, None, DocumentId(0))
.unwrap(); .unwrap();
let new_doc1 = serde_json::json!({ let new_doc1 = serde_json::json!({
@ -908,7 +908,7 @@ mod tests {
assert_eq!(document, Some(new_doc1)); assert_eq!(document, Some(new_doc1));
let document: Option<serde_json::Value> = index let document: Option<serde_json::Value> = index
.document(&reader, None, DocumentId(8_367_468_610_878_465_872)) .document(&reader, None, DocumentId(1))
.unwrap(); .unwrap();
let new_doc2 = serde_json::json!({ let new_doc2 = serde_json::json!({
@ -1080,14 +1080,14 @@ mod tests {
assert_matches!( assert_matches!(
iter.next(), iter.next(),
Some(Document { Some(Document {
id: DocumentId(7_900_334_843_754_999_545), id: DocumentId(0),
.. ..
}) })
); );
assert_matches!( assert_matches!(
iter.next(), iter.next(),
Some(Document { Some(Document {
id: DocumentId(8_367_468_610_878_465_872), id: DocumentId(1),
.. ..
}) })
); );

View File

@ -90,7 +90,16 @@ impl Main {
// We do an union of the old and new internal ids. // We do an union of the old and new internal ids.
let internal_ids = self.internal_ids(writer)?; let internal_ids = self.internal_ids(writer)?;
let internal_ids = sdset::duo::Union::new(&new_ids, &internal_ids).into_set_buf(); let internal_ids = sdset::duo::Union::new(&internal_ids, new_ids).into_set_buf();
self.put_internal_ids(writer, &internal_ids)
}
pub fn remove_internal_ids(self, writer: &mut heed::RwTxn<MainT>, ids: &sdset::Set<DocumentId>) -> ZResult<()> {
use sdset::SetOperation;
// We do a difference of the old and new internal ids.
let internal_ids = self.internal_ids(writer)?;
let internal_ids = sdset::duo::Difference::new(&internal_ids, ids).into_set_buf();
self.put_internal_ids(writer, &internal_ids) self.put_internal_ids(writer, &internal_ids)
} }
@ -101,10 +110,25 @@ impl Main {
pub fn merge_user_ids(self, writer: &mut heed::RwTxn<MainT>, new_ids: &fst::Map) -> ZResult<()> { pub fn merge_user_ids(self, writer: &mut heed::RwTxn<MainT>, new_ids: &fst::Map) -> ZResult<()> {
use fst::{Streamer, IntoStreamer}; use fst::{Streamer, IntoStreamer};
// Do an union of the old and the new set of user ids.
let user_ids = self.user_ids(writer)?; let user_ids = self.user_ids(writer)?;
let mut op = user_ids.op().add(new_ids.into_stream()).r#union();
let mut build = fst::MapBuilder::memory();
while let Some((userid, values)) = op.next() {
build.insert(userid, values[0].value).unwrap();
}
let user_ids = build.into_inner().unwrap();
// TODO prefer using self.put_user_ids
self.main.put::<_, Str, ByteSlice>(writer, USER_IDS_KEY, user_ids.as_slice())
}
pub fn remove_user_ids(self, writer: &mut heed::RwTxn<MainT>, ids: &fst::Map) -> ZResult<()> {
use fst::{Streamer, IntoStreamer};
// Do an union of the old and the new set of user ids. // Do an union of the old and the new set of user ids.
let mut op = user_ids.op().add(new_ids.into_stream()).r#union(); let user_ids = self.user_ids(writer)?;
let mut op = user_ids.op().add(ids.into_stream()).difference();
let mut build = fst::MapBuilder::memory(); let mut build = fst::MapBuilder::memory();
while let Some((userid, values)) = op.next() { while let Some((userid, values)) = op.next() {
build.insert(userid, values[0].value).unwrap(); build.insert(userid, values[0].value).unwrap();
@ -127,6 +151,11 @@ impl Main {
} }
} }
pub fn user_to_internal_id(self, reader: &heed::RoTxn<MainT>, userid: &str) -> ZResult<Option<DocumentId>> {
let user_ids = self.user_ids(reader)?;
Ok(user_ids.get(userid).map(DocumentId))
}
pub fn put_words_fst(self, writer: &mut heed::RwTxn<MainT>, fst: &fst::Set) -> ZResult<()> { pub fn put_words_fst(self, writer: &mut heed::RwTxn<MainT>, fst: &fst::Set) -> ZResult<()> {
self.main.put::<_, Str, ByteSlice>(writer, WORDS_KEY, fst.as_fst().as_bytes()) self.main.put::<_, Str, ByteSlice>(writer, WORDS_KEY, fst.as_fst().as_bytes())
} }

View File

@ -7,6 +7,8 @@ pub fn apply_clear_all(
index: &store::Index, index: &store::Index,
) -> MResult<()> { ) -> MResult<()> {
index.main.put_words_fst(writer, &fst::Set::default())?; index.main.put_words_fst(writer, &fst::Set::default())?;
index.main.put_user_ids(writer, &fst::Map::default())?;
index.main.put_internal_ids(writer, &sdset::SetBuf::default())?;
index.main.put_ranked_map(writer, &RankedMap::default())?; index.main.put_ranked_map(writer, &RankedMap::default())?;
index.main.put_number_of_documents(writer, |_| 0)?; index.main.put_number_of_documents(writer, |_| 0)?;
index.documents_fields.clear(writer)?; index.documents_fields.clear(writer)?;

View File

@ -190,9 +190,9 @@ pub fn apply_addition<'a, 'b>(
documents_additions.insert(document_id, document); documents_additions.insert(document_id, document);
} }
// 2. remove the documents posting lists // 2. remove the documents postings lists
let number_of_inserted_documents = documents_additions.len(); let number_of_inserted_documents = documents_additions.len();
let documents_ids = documents_additions.iter().map(|(id, _)| *id).collect(); let documents_ids = new_user_ids.iter().map(|(userid, _)| userid.clone()).collect();
apply_documents_deletion(writer, index, documents_ids)?; apply_documents_deletion(writer, index, documents_ids)?;
let mut ranked_map = match index.main.ranked_map(writer)? { let mut ranked_map = match index.main.ranked_map(writer)? {

View File

@ -14,7 +14,7 @@ pub struct DocumentsDeletion {
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: UpdateEventsEmitter, updates_notifier: UpdateEventsEmitter,
documents: Vec<DocumentId>, documents: Vec<String>,
} }
impl DocumentsDeletion { impl DocumentsDeletion {
@ -31,7 +31,7 @@ impl DocumentsDeletion {
} }
} }
pub fn delete_document_by_id(&mut self, document_id: DocumentId) { pub fn delete_document_by_user_id(&mut self, document_id: String) {
self.documents.push(document_id); self.documents.push(document_id);
} }
@ -47,8 +47,8 @@ impl DocumentsDeletion {
} }
} }
impl Extend<DocumentId> for DocumentsDeletion { impl Extend<String> for DocumentsDeletion {
fn extend<T: IntoIterator<Item = DocumentId>>(&mut self, iter: T) { fn extend<T: IntoIterator<Item=String>>(&mut self, iter: T) {
self.documents.extend(iter) self.documents.extend(iter)
} }
} }
@ -57,7 +57,7 @@ pub fn push_documents_deletion(
writer: &mut heed::RwTxn<UpdateT>, writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
deletion: Vec<DocumentId>, deletion: Vec<String>,
) -> MResult<u64> { ) -> MResult<u64> {
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
@ -70,10 +70,23 @@ pub fn push_documents_deletion(
pub fn apply_documents_deletion( pub fn apply_documents_deletion(
writer: &mut heed::RwTxn<MainT>, writer: &mut heed::RwTxn<MainT>,
index: &store::Index, index: &store::Index,
deletion: Vec<DocumentId>, deletion: Vec<String>,
) -> MResult<()> ) -> MResult<()>
{ {
unimplemented!("When we delete documents we must ask for user ids instead of internal ones"); let (user_ids, internal_ids) = {
let new_user_ids = SetBuf::from_dirty(deletion);
let mut internal_ids = Vec::new();
let user_ids = index.main.user_ids(writer)?;
for userid in new_user_ids.as_slice() {
if let Some(id) = user_ids.get(userid) {
internal_ids.push(DocumentId(id));
}
}
let new_user_ids = fst::Map::from_iter(new_user_ids.into_iter().map(|k| (k, 0))).unwrap();
(new_user_ids, SetBuf::from_dirty(internal_ids))
};
let schema = match index.main.schema(writer)? { let schema = match index.main.schema(writer)? {
Some(schema) => schema, Some(schema) => schema,
@ -87,16 +100,15 @@ pub fn apply_documents_deletion(
// facet filters deletion // facet filters deletion
if let Some(attributes_for_facetting) = index.main.attributes_for_faceting(writer)? { if let Some(attributes_for_facetting) = index.main.attributes_for_faceting(writer)? {
let facet_map = facets::facet_map_from_docids(writer, &index, &deletion, &attributes_for_facetting)?; let facet_map = facets::facet_map_from_docids(writer, &index, &internal_ids, &attributes_for_facetting)?;
index.facets.remove(writer, facet_map)?; index.facets.remove(writer, facet_map)?;
} }
// collect the ranked attributes according to the schema // collect the ranked attributes according to the schema
let ranked_fields = schema.ranked(); let ranked_fields = schema.ranked();
let idset = SetBuf::from_dirty(deletion);
let mut words_document_ids = HashMap::new(); let mut words_document_ids = HashMap::new();
for id in idset { for id in internal_ids.iter().cloned() {
// remove all the ranked attributes from the ranked_map // remove all the ranked attributes from the ranked_map
for ranked_attr in ranked_fields { for ranked_attr in ranked_fields {
ranked_map.remove(id, *ranked_attr); ranked_map.remove(id, *ranked_attr);
@ -166,6 +178,10 @@ pub fn apply_documents_deletion(
index.main.put_ranked_map(writer, &ranked_map)?; index.main.put_ranked_map(writer, &ranked_map)?;
index.main.put_number_of_documents(writer, |old| old - deleted_documents_len)?; index.main.put_number_of_documents(writer, |old| old - deleted_documents_len)?;
// We apply the changes to the user and internal ids
index.main.remove_user_ids(writer, &user_ids)?;
index.main.remove_internal_ids(writer, &internal_ids)?;
compute_short_prefixes(writer, index)?; compute_short_prefixes(writer, index)?;
Ok(()) Ok(())

View File

@ -24,7 +24,7 @@ use sdset::Set;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use crate::{store, DocumentId, MResult}; use crate::{store, MResult};
use crate::database::{MainT, UpdateT}; use crate::database::{MainT, UpdateT};
use crate::settings::SettingsUpdate; use crate::settings::SettingsUpdate;
@ -63,7 +63,7 @@ impl Update {
} }
} }
fn documents_deletion(data: Vec<DocumentId>) -> Update { fn documents_deletion(data: Vec<String>) -> Update {
Update { Update {
data: UpdateData::DocumentsDeletion(data), data: UpdateData::DocumentsDeletion(data),
enqueued_at: Utc::now(), enqueued_at: Utc::now(),
@ -84,7 +84,7 @@ pub enum UpdateData {
Customs(Vec<u8>), Customs(Vec<u8>),
DocumentsAddition(Vec<IndexMap<String, Value>>), DocumentsAddition(Vec<IndexMap<String, Value>>),
DocumentsPartial(Vec<IndexMap<String, Value>>), DocumentsPartial(Vec<IndexMap<String, Value>>),
DocumentsDeletion(Vec<DocumentId>), DocumentsDeletion(Vec<String>),
Settings(SettingsUpdate) Settings(SettingsUpdate)
} }

View File

@ -3,7 +3,7 @@ use std::collections::{BTreeSet, HashSet};
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use actix_web_macros::{delete, get, post, put}; use actix_web_macros::{delete, get, post, put};
use indexmap::IndexMap; use indexmap::IndexMap;
use meilisearch_core::{update, Error}; use meilisearch_core::update;
use serde::Deserialize; use serde::Deserialize;
use serde_json::Value; use serde_json::Value;
@ -43,11 +43,16 @@ async fn get_document(
.open_index(&path.index_uid) .open_index(&path.index_uid)
.ok_or(ResponseError::index_not_found(&path.index_uid))?; .ok_or(ResponseError::index_not_found(&path.index_uid))?;
let document_id = update::compute_document_id(&path.document_id).map_err(Error::Serializer)?;
let reader = data.db.main_read_txn()?; let reader = data.db.main_read_txn()?;
let internal_id = index.main.user_to_internal_id(&reader, &path.document_id)?;
let internal_id = match internal_id {
Some(internal_id) => internal_id,
None => return Err(ResponseError::document_not_found(&path.document_id)),
};
let response: Document = index let response: Document = index
.document(&reader, None, document_id)? .document(&reader, None, internal_id)?
.ok_or(ResponseError::document_not_found(&path.document_id))?; .ok_or(ResponseError::document_not_found(&path.document_id))?;
Ok(HttpResponse::Ok().json(response)) Ok(HttpResponse::Ok().json(response))
@ -66,12 +71,10 @@ async fn delete_document(
.open_index(&path.index_uid) .open_index(&path.index_uid)
.ok_or(ResponseError::index_not_found(&path.index_uid))?; .ok_or(ResponseError::index_not_found(&path.index_uid))?;
let document_id = update::compute_document_id(&path.document_id).map_err(Error::Serializer)?;
let mut update_writer = data.db.update_write_txn()?; let mut update_writer = data.db.update_write_txn()?;
let mut documents_deletion = index.documents_deletion(); let mut documents_deletion = index.documents_deletion();
documents_deletion.delete_document_by_id(document_id); documents_deletion.delete_document_by_user_id(path.document_id.clone());
let update_id = documents_deletion.finalize(&mut update_writer)?; let update_id = documents_deletion.finalize(&mut update_writer)?;
@ -239,8 +242,7 @@ async fn delete_documents(
for document_id in body.into_inner() { for document_id in body.into_inner() {
let document_id = update::value_to_string(&document_id); let document_id = update::value_to_string(&document_id);
let document_id = update::compute_document_id(&document_id).map_err(Error::Serializer)?; documents_deletion.delete_document_by_user_id(document_id);
documents_deletion.delete_document_by_id(document_id);
} }
let update_id = documents_deletion.finalize(&mut writer)?; let update_id = documents_deletion.finalize(&mut writer)?;