Generate a uuid v4 based document id when missing

This commit is contained in:
Clément Renault 2020-10-31 12:54:43 +01:00
parent ddbd336387
commit 9d47ee52b4
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
4 changed files with 140 additions and 14 deletions

10
Cargo.lock generated
View File

@ -1054,6 +1054,7 @@ dependencies = [
"structopt", "structopt",
"tempfile", "tempfile",
"tokio", "tokio",
"uuid",
"warp", "warp",
] ]
@ -2259,6 +2260,15 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
[[package]]
name = "uuid"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
dependencies = [
"rand 0.7.3",
]
[[package]] [[package]]
name = "version_check" name = "version_check"
version = "0.1.5" version = "0.1.5"

View File

@ -33,6 +33,7 @@ smallstr = { version = "0.2.0", features = ["serde"] }
smallvec = "1.4.0" smallvec = "1.4.0"
structopt = { version = "0.3.14", default-features = false, features = ["wrap_help"] } structopt = { version = "0.3.14", default-features = false, features = ["wrap_help"] }
tempfile = "3.1.0" tempfile = "3.1.0"
uuid = { version = "0.8.1", features = ["v4"] }
# documents words self-join # documents words self-join
itertools = "0.9.0" itertools = "0.9.0"

View File

@ -275,6 +275,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
}; };
let TransformOutput { let TransformOutput {
primary_key,
fields_ids_map, fields_ids_map,
users_ids_documents_ids, users_ids_documents_ids,
new_documents_ids, new_documents_ids,
@ -415,6 +416,9 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
// We write the fields ids map into the main database // We write the fields ids map into the main database
self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?;
// We write the primary key field id into the main database
self.index.put_primary_key(self.wtxn, primary_key)?;
// We write the users_ids_documents_ids into the main database. // We write the users_ids_documents_ids into the main database.
self.index.put_users_ids_documents_ids(self.wtxn, &users_ids_documents_ids)?; self.index.put_users_ids_documents_ids(self.wtxn, &users_ids_documents_ids)?;
@ -597,4 +601,74 @@ mod tests {
assert_eq!(doc_iter.next(), None); assert_eq!(doc_iter.next(), None);
drop(rtxn); drop(rtxn);
} }
#[test]
fn simple_auto_generated_documents_ids() {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nkevin\nkevina\nbenoit\n"[..];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3);
let docs = index.documents(&rtxn, vec![0, 1, 2]).unwrap();
let (_id, obkv) = docs.iter().find(|(_id, kv)| kv.get(0) == Some(br#""kevin""#)).unwrap();
let kevin_uuid: String = serde_json::from_slice(&obkv.get(1).unwrap()).unwrap();
drop(rtxn);
// Second we send 1 document with the generated uuid, to erase the previous ones.
let mut wtxn = index.write_txn().unwrap();
let content = format!("id,name\n{},updated kevin", kevin_uuid);
IndexDocuments::new(&mut wtxn, &index).execute(content.as_bytes(), |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 3 documents.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3);
drop(rtxn);
}
#[test]
fn reordered_auto_generated_documents_ids() {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
// First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3);
drop(rtxn);
// Second we send 1 document without specifying the id.
let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nnew kevin"[..];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 4 documents now.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 4);
drop(rtxn);
}
} }

View File

@ -14,6 +14,7 @@ use super::merge_function::merge_two_obkvs;
use super::{create_writer, create_sorter, IndexDocumentsMethod}; use super::{create_writer, create_sorter, IndexDocumentsMethod};
pub struct TransformOutput { pub struct TransformOutput {
pub primary_key: u8,
pub fields_ids_map: FieldsIdsMap, pub fields_ids_map: FieldsIdsMap,
pub users_ids_documents_ids: fst::Map<Vec<u8>>, pub users_ids_documents_ids: fst::Map<Vec<u8>>,
pub new_documents_ids: RoaringBitmap, pub new_documents_ids: RoaringBitmap,
@ -23,7 +24,7 @@ pub struct TransformOutput {
} }
pub struct Transform<'t, 'i> { pub struct Transform<'t, 'i> {
pub rtxn: &'t heed::RoTxn<'t>, pub rtxn: &'t heed::RoTxn<'i>,
pub index: &'i Index, pub index: &'i Index,
pub chunk_compression_type: CompressionType, pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>, pub chunk_compression_level: Option<u32>,
@ -47,17 +48,43 @@ impl Transform<'_, '_> {
let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap();
let mut csv = csv::Reader::from_reader(reader); let mut csv = csv::Reader::from_reader(reader);
let headers = csv.headers()?.clone(); let headers = csv.headers()?;
let user_id_pos = headers.iter().position(|h| h == "id").context(r#"missing "id" header"#)?; let primary_key = self.index.primary_key(self.rtxn)?;
// Generate the new fields ids based on the current fields ids and this CSV headers. // Generate the new fields ids based on the current fields ids and this CSV headers.
let mut fields_ids = Vec::new(); let mut fields_ids = Vec::new();
for header in headers.iter() { for (i, header) in headers.iter().enumerate() {
let id = fields_ids_map.insert(header) let id = fields_ids_map.insert(header).context("field id limit reached)")?;
.context("impossible to generate a field id (limit reached)")?; fields_ids.push((id, i));
fields_ids.push(id);
} }
// Extract the position of the primary key in the current headers, None if not found.
let user_id_pos = match primary_key {
Some(primary_key) => {
// Te primary key have is known so we must find the position in the CSV headers.
let name = fields_ids_map.name(primary_key).expect("found the primary key name");
headers.iter().position(|h| h == name)
},
None => headers.iter().position(|h| h.contains("id")),
};
// Returns the field id in the fileds ids map, create an "id" field
// in case it is not in the current headers.
let primary_key_field_id = match user_id_pos {
Some(pos) => fields_ids_map.id(&headers[pos]).expect("found the primary key"),
None => {
let id = fields_ids_map.insert("id").context("field id limit reached")?;
// We make sure to add the primary key field id to the fields ids,
// this way it is added to the obks.
fields_ids.push((id, usize::max_value()));
id
},
};
// We sort the fields ids by the fields ids map id, this way we are sure to iterate over
// the records fields in the fields ids map order and correctly generate the obkv.
fields_ids.sort_unstable_by_key(|(field_id, _)| *field_id);
/// Only the last value associated with an id is kept. /// Only the last value associated with an id is kept.
fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> { fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
obkvs.last().context("no last value").map(|last| last.clone().into_owned()) obkvs.last().context("no last value").map(|last| last.clone().into_owned())
@ -77,24 +104,37 @@ impl Transform<'_, '_> {
// based on the users ids. // based on the users ids.
let mut json_buffer = Vec::new(); let mut json_buffer = Vec::new();
let mut obkv_buffer = Vec::new(); let mut obkv_buffer = Vec::new();
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
let mut record = csv::StringRecord::new(); let mut record = csv::StringRecord::new();
while csv.read_record(&mut record)? { while csv.read_record(&mut record)? {
obkv_buffer.clear(); obkv_buffer.clear();
let mut writer = obkv::KvWriter::new(&mut obkv_buffer); let mut writer = obkv::KvWriter::new(&mut obkv_buffer);
// We retrieve the field id based on the CSV header position // We extract the user id if we know where it is or generate an UUID V4 otherwise.
// and zip it with the record value. // TODO we must validate the user id (i.e. [a-zA-Z0-9\-_]).
for (key, field) in fields_ids.iter().copied().zip(&record) { let user_id = match user_id_pos {
Some(pos) => &record[pos],
None => uuid::Uuid::new_v4().to_hyphenated().encode_lower(&mut uuid_buffer),
};
// When the primary_key_field_id is found in the fields ids list
// we return the generated document id instead of the record field.
let iter = fields_ids.iter()
.map(|(fi, i)| {
let field = if *fi == primary_key_field_id { user_id } else { &record[*i] };
(fi, field)
});
// We retrieve the field id based on the fields ids map fields ids order.
for (field_id, field) in iter {
// We serialize the attribute values as JSON strings. // We serialize the attribute values as JSON strings.
json_buffer.clear(); json_buffer.clear();
serde_json::to_writer(&mut json_buffer, &field)?; serde_json::to_writer(&mut json_buffer, &field)?;
writer.insert(key, &json_buffer)?; writer.insert(*field_id, &json_buffer)?;
} }
// We extract the user id and use it as the key for this document. // We use the extracted/generated user id as the key for this document.
// TODO we must validate the user id (i.e. [a-zA-Z0-9\-_]).
let user_id = &record[user_id_pos];
sorter.insert(user_id, &obkv_buffer)?; sorter.insert(user_id, &obkv_buffer)?;
} }
@ -179,6 +219,7 @@ impl Transform<'_, '_> {
} }
Ok(TransformOutput { Ok(TransformOutput {
primary_key: primary_key_field_id,
fields_ids_map, fields_ids_map,
users_ids_documents_ids: users_ids_documents_ids_builder.into_map(), users_ids_documents_ids: users_ids_documents_ids_builder.into_map(),
new_documents_ids, new_documents_ids,