Compute and merge discovered ids

This commit is contained in:
Kerollmops 2020-05-19 11:45:46 +02:00
parent 016bfa391b
commit 5bf15a4190
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
7 changed files with 75 additions and 20 deletions

1
Cargo.lock generated
View File

@ -1624,7 +1624,6 @@ dependencies = [
"sdset", "sdset",
"serde", "serde",
"serde_json", "serde_json",
"siphasher",
"slice-group-by", "slice-group-by",
"structopt", "structopt",
"tempfile", "tempfile",

View File

@ -35,7 +35,6 @@ regex = "1.3.6"
sdset = "0.4.0" sdset = "0.4.0"
serde = { version = "1.0.105", features = ["derive"] } serde = { version = "1.0.105", features = ["derive"] }
serde_json = { version = "1.0.50", features = ["preserve_order"] } serde_json = { version = "1.0.50", features = ["preserve_order"] }
siphasher = "0.3.2"
slice-group-by = "0.2.6" slice-group-by = "0.2.6"
unicase = "2.6.0" unicase = "2.6.0"
zerocopy = "0.3.0" zerocopy = "0.3.0"

View File

@ -85,10 +85,36 @@ impl Main {
} }
} }
pub fn merge_internal_ids(self, writer: &mut heed::RwTxn<MainT>, new_ids: &sdset::Set<DocumentId>) -> ZResult<()> {
use sdset::SetOperation;
// We do an union of the old and new internal ids.
let internal_ids = self.internal_ids(writer)?;
let internal_ids = sdset::duo::Union::new(&new_ids, &internal_ids).into_set_buf();
self.put_internal_ids(writer, &internal_ids)
}
pub fn put_user_ids(self, writer: &mut heed::RwTxn<MainT>, ids: &fst::Map) -> ZResult<()> { pub fn put_user_ids(self, writer: &mut heed::RwTxn<MainT>, ids: &fst::Map) -> ZResult<()> {
self.main.put::<_, Str, ByteSlice>(writer, USER_IDS_KEY, ids.as_fst().as_bytes()) self.main.put::<_, Str, ByteSlice>(writer, USER_IDS_KEY, ids.as_fst().as_bytes())
} }
pub fn merge_user_ids(self, writer: &mut heed::RwTxn<MainT>, new_ids: &fst::Map) -> ZResult<()> {
use fst::{Streamer, IntoStreamer};
let user_ids = self.user_ids(writer)?;
// Do an union of the old and the new set of user ids.
let mut op = user_ids.op().add(new_ids.into_stream()).r#union();
let mut build = fst::MapBuilder::memory();
while let Some((userid, values)) = op.next() {
build.insert(userid, values[0].value).unwrap();
}
let user_ids = build.into_inner().unwrap();
// TODO prefer using self.put_user_ids
self.main.put::<_, Str, ByteSlice>(writer, USER_IDS_KEY, user_ids.as_slice())
}
pub fn user_ids(self, reader: &heed::RoTxn<MainT>) -> ZResult<fst::Map> { pub fn user_ids(self, reader: &heed::RoTxn<MainT>) -> ZResult<fst::Map> {
match self.main.get::<_, Str, ByteSlice>(reader, USER_IDS_KEY)? { match self.main.get::<_, Str, ByteSlice>(reader, USER_IDS_KEY)? {
Some(bytes) => { Some(bytes) => {

View File

@ -1,4 +1,4 @@
use std::collections::HashMap; use std::collections::{HashMap, BTreeMap};
use fst::{set::OpBuilder, SetBuilder}; use fst::{set::OpBuilder, SetBuilder};
use indexmap::IndexMap; use indexmap::IndexMap;
@ -13,7 +13,7 @@ use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::facets; use crate::facets;
use crate::raw_indexer::RawIndexer; use crate::raw_indexer::RawIndexer;
use crate::serde::Deserializer; use crate::serde::Deserializer;
use crate::store::{self, DocumentsFields, DocumentsFieldsCounts}; use crate::store::{self, DocumentsFields, DocumentsFieldsCounts, DiscoverIds};
use crate::update::helpers::{index_value, value_to_number, extract_document_id}; use crate::update::helpers::{index_value, value_to_number, extract_document_id};
use crate::update::{apply_documents_deletion, compute_short_prefixes, next_update_id, Update}; use crate::update::{apply_documents_deletion, compute_short_prefixes, next_update_id, Update};
use crate::{Error, MResult, RankedMap}; use crate::{Error, MResult, RankedMap};
@ -150,17 +150,26 @@ pub fn apply_addition<'a, 'b>(
partial: bool partial: bool
) -> MResult<()> { ) -> MResult<()> {
let mut documents_additions = HashMap::new(); let mut documents_additions = HashMap::new();
let mut new_user_ids = BTreeMap::new();
let mut new_internal_ids = Vec::with_capacity(new_documents.len());
let mut schema = match index.main.schema(writer)? { let mut schema = match index.main.schema(writer)? {
Some(schema) => schema, Some(schema) => schema,
None => return Err(Error::SchemaMissing), None => return Err(Error::SchemaMissing),
}; };
// Retrieve the documents ids related structures
let user_ids = index.main.user_ids(writer)?;
let internal_ids = index.main.internal_ids(writer)?;
let mut available_ids = DiscoverIds::new(&internal_ids);
let primary_key = schema.primary_key().ok_or(Error::MissingPrimaryKey)?; let primary_key = schema.primary_key().ok_or(Error::MissingPrimaryKey)?;
// 1. store documents ids for future deletion // 1. store documents ids for future deletion
for mut document in new_documents { for mut document in new_documents {
let document_id = extract_document_id(&primary_key, &document)?; let (document_id, userid) = extract_document_id(&primary_key, &document, &user_ids, &mut available_ids)?;
new_user_ids.insert(userid, document_id.0);
new_internal_ids.push(document_id);
if partial { if partial {
let mut deserializer = Deserializer { let mut deserializer = Deserializer {
@ -233,6 +242,11 @@ pub fn apply_addition<'a, 'b>(
index.main.put_schema(writer, &schema)?; index.main.put_schema(writer, &schema)?;
let new_user_ids = fst::Map::from_iter(new_user_ids)?;
let new_internal_ids = sdset::SetBuf::from_dirty(new_internal_ids);
index.main.merge_user_ids(writer, &new_user_ids)?;
index.main.merge_internal_ids(writer, &new_internal_ids)?;
Ok(()) Ok(())
} }

View File

@ -71,7 +71,10 @@ pub fn apply_documents_deletion(
writer: &mut heed::RwTxn<MainT>, writer: &mut heed::RwTxn<MainT>,
index: &store::Index, index: &store::Index,
deletion: Vec<DocumentId>, deletion: Vec<DocumentId>,
) -> MResult<()> { ) -> MResult<()>
{
unimplemented!("When we delete documents we must ask for user ids instead of internal ones");
let schema = match index.main.schema(writer)? { let schema = match index.main.schema(writer)? {
Some(schema) => schema, Some(schema) => schema,
None => return Err(Error::SchemaMissing), None => return Err(Error::SchemaMissing),

View File

@ -1,16 +1,15 @@
use std::fmt::Write as _; use std::fmt::Write as _;
use std::hash::{Hash, Hasher};
use indexmap::IndexMap; use indexmap::IndexMap;
use meilisearch_schema::IndexedPos; use meilisearch_schema::IndexedPos;
use meilisearch_types::DocumentId; use meilisearch_types::DocumentId;
use ordered_float::OrderedFloat; use ordered_float::OrderedFloat;
use serde_json::Value; use serde_json::Value;
use siphasher::sip::SipHasher;
use crate::Number;
use crate::raw_indexer::RawIndexer; use crate::raw_indexer::RawIndexer;
use crate::serde::SerializerError; use crate::serde::SerializerError;
use crate::Number; use crate::store::DiscoverIds;
/// Returns the number of words indexed or `None` if the type is unindexable. /// Returns the number of words indexed or `None` if the type is unindexable.
pub fn index_value( pub fn index_value(
@ -96,28 +95,43 @@ pub fn value_to_number(value: &Value) -> Option<Number> {
} }
} }
/// Validates a string representation to be a correct document id and /// Validates a string representation to be a correct document id and returns
/// returns the hash of the given type, this is the way we produce documents ids. /// the corresponding id or generate a new one, this is the way we produce documents ids.
pub fn compute_document_id(string: &str) -> Result<DocumentId, SerializerError> { pub fn discover_document_id(
if string.chars().all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') { userid: &str,
let mut s = SipHasher::new(); user_ids: &fst::Map,
string.hash(&mut s); available_ids: &mut DiscoverIds<'_>,
Ok(DocumentId(s.finish())) ) -> Result<DocumentId, SerializerError>
{
if userid.chars().all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') {
match user_ids.get(userid) {
Some(internal_id) => Ok(DocumentId(internal_id)),
None => {
let internal_id = available_ids.next().expect("no more ids available");
Ok(internal_id)
},
}
} else { } else {
Err(SerializerError::InvalidDocumentIdFormat) Err(SerializerError::InvalidDocumentIdFormat)
} }
} }
/// Extracts and validates the document id of a document. /// Extracts and validates the document id of a document.
pub fn extract_document_id(primary_key: &str, document: &IndexMap<String, Value>) -> Result<DocumentId, SerializerError> { pub fn extract_document_id(
primary_key: &str,
document: &IndexMap<String, Value>,
user_ids: &fst::Map,
available_ids: &mut DiscoverIds<'_>,
) -> Result<(DocumentId, String), SerializerError>
{
match document.get(primary_key) { match document.get(primary_key) {
Some(value) => { Some(value) => {
let string = match value { let userid = match value {
Value::Number(number) => number.to_string(), Value::Number(number) => number.to_string(),
Value::String(string) => string.clone(), Value::String(string) => string.clone(),
_ => return Err(SerializerError::InvalidDocumentIdFormat), _ => return Err(SerializerError::InvalidDocumentIdFormat),
}; };
compute_document_id(&string) discover_document_id(&userid, user_ids, available_ids).map(|id| (id, userid))
} }
None => Err(SerializerError::DocumentIdNotFound), None => Err(SerializerError::DocumentIdNotFound),
} }

View File

@ -9,7 +9,7 @@ pub use self::clear_all::{apply_clear_all, push_clear_all};
pub use self::customs_update::{apply_customs_update, push_customs_update}; pub use self::customs_update::{apply_customs_update, push_customs_update};
pub use self::documents_addition::{apply_documents_addition, apply_documents_partial_addition, DocumentsAddition}; pub use self::documents_addition::{apply_documents_addition, apply_documents_partial_addition, DocumentsAddition};
pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion}; pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion};
pub use self::helpers::{index_value, value_to_string, value_to_number, compute_document_id, extract_document_id}; pub use self::helpers::{index_value, value_to_string, value_to_number, discover_document_id, extract_document_id};
pub use self::settings_update::{apply_settings_update, push_settings_update}; pub use self::settings_update::{apply_settings_update, push_settings_update};
use std::cmp; use std::cmp;