diff --git a/Cargo.lock b/Cargo.lock index 63c0ea192..6ae7d16ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1054,6 +1054,7 @@ dependencies = [ "structopt", "tempfile", "tokio", + "uuid", "warp", ] @@ -2259,6 +2260,15 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" +[[package]] +name = "uuid" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" +dependencies = [ + "rand 0.7.3", +] + [[package]] name = "version_check" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index d4a67c623..27f4d3a2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ smallstr = { version = "0.2.0", features = ["serde"] } smallvec = "1.4.0" structopt = { version = "0.3.14", default-features = false, features = ["wrap_help"] } tempfile = "3.1.0" +uuid = { version = "0.8.1", features = ["v4"] } # documents words self-join itertools = "0.9.0" diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 10fd83f36..bec82b01e 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -275,6 +275,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { }; let TransformOutput { + primary_key, fields_ids_map, users_ids_documents_ids, new_documents_ids, @@ -415,6 +416,9 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { // We write the fields ids map into the main database self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; + // We write the primary key field id into the main database + self.index.put_primary_key(self.wtxn, primary_key)?; + // We write the users_ids_documents_ids into the main database. self.index.put_users_ids_documents_ids(self.wtxn, &users_ids_documents_ids)?; @@ -597,4 +601,74 @@ mod tests { assert_eq!(doc_iter.next(), None); drop(rtxn); } + + #[test] + fn simple_auto_generated_documents_ids() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + + let index = Index::new(options, &path).unwrap(); + + // First we send 3 documents with ids from 1 to 3. + let mut wtxn = index.write_txn().unwrap(); + let content = &b"name\nkevin\nkevina\nbenoit\n"[..]; + IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is 3 documents now. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 3); + + let docs = index.documents(&rtxn, vec![0, 1, 2]).unwrap(); + let (_id, obkv) = docs.iter().find(|(_id, kv)| kv.get(0) == Some(br#""kevin""#)).unwrap(); + let kevin_uuid: String = serde_json::from_slice(&obkv.get(1).unwrap()).unwrap(); + drop(rtxn); + + // Second we send 1 document with the generated uuid, to erase the previous ones. + let mut wtxn = index.write_txn().unwrap(); + let content = format!("id,name\n{},updated kevin", kevin_uuid); + IndexDocuments::new(&mut wtxn, &index).execute(content.as_bytes(), |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is **always** 3 documents. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 3); + drop(rtxn); + } + + #[test] + fn reordered_auto_generated_documents_ids() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + + let index = Index::new(options, &path).unwrap(); + + // First we send 3 documents with ids from 1 to 3. + let mut wtxn = index.write_txn().unwrap(); + let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; + IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is 3 documents now. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 3); + drop(rtxn); + + // Second we send 1 document without specifying the id. + let mut wtxn = index.write_txn().unwrap(); + let content = &b"name\nnew kevin"[..]; + IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that there is 4 documents now. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 4); + drop(rtxn); + } } diff --git a/src/update/index_documents/transform.rs b/src/update/index_documents/transform.rs index b6ec88879..33f18a48a 100644 --- a/src/update/index_documents/transform.rs +++ b/src/update/index_documents/transform.rs @@ -14,6 +14,7 @@ use super::merge_function::merge_two_obkvs; use super::{create_writer, create_sorter, IndexDocumentsMethod}; pub struct TransformOutput { + pub primary_key: u8, pub fields_ids_map: FieldsIdsMap, pub users_ids_documents_ids: fst::Map>, pub new_documents_ids: RoaringBitmap, @@ -23,7 +24,7 @@ pub struct TransformOutput { } pub struct Transform<'t, 'i> { - pub rtxn: &'t heed::RoTxn<'t>, + pub rtxn: &'t heed::RoTxn<'i>, pub index: &'i Index, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, @@ -47,17 +48,43 @@ impl Transform<'_, '_> { let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); let mut csv = csv::Reader::from_reader(reader); - let headers = csv.headers()?.clone(); - let user_id_pos = headers.iter().position(|h| h == "id").context(r#"missing "id" header"#)?; + let headers = csv.headers()?; + let primary_key = self.index.primary_key(self.rtxn)?; // Generate the new fields ids based on the current fields ids and this CSV headers. let mut fields_ids = Vec::new(); - for header in headers.iter() { - let id = fields_ids_map.insert(header) - .context("impossible to generate a field id (limit reached)")?; - fields_ids.push(id); + for (i, header) in headers.iter().enumerate() { + let id = fields_ids_map.insert(header).context("field id limit reached)")?; + fields_ids.push((id, i)); } + // Extract the position of the primary key in the current headers, None if not found. + let user_id_pos = match primary_key { + Some(primary_key) => { + // Te primary key have is known so we must find the position in the CSV headers. + let name = fields_ids_map.name(primary_key).expect("found the primary key name"); + headers.iter().position(|h| h == name) + }, + None => headers.iter().position(|h| h.contains("id")), + }; + + // Returns the field id in the fileds ids map, create an "id" field + // in case it is not in the current headers. + let primary_key_field_id = match user_id_pos { + Some(pos) => fields_ids_map.id(&headers[pos]).expect("found the primary key"), + None => { + let id = fields_ids_map.insert("id").context("field id limit reached")?; + // We make sure to add the primary key field id to the fields ids, + // this way it is added to the obks. + fields_ids.push((id, usize::max_value())); + id + }, + }; + + // We sort the fields ids by the fields ids map id, this way we are sure to iterate over + // the records fields in the fields ids map order and correctly generate the obkv. + fields_ids.sort_unstable_by_key(|(field_id, _)| *field_id); + /// Only the last value associated with an id is kept. fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result> { obkvs.last().context("no last value").map(|last| last.clone().into_owned()) @@ -77,24 +104,37 @@ impl Transform<'_, '_> { // based on the users ids. let mut json_buffer = Vec::new(); let mut obkv_buffer = Vec::new(); + let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; let mut record = csv::StringRecord::new(); while csv.read_record(&mut record)? { obkv_buffer.clear(); let mut writer = obkv::KvWriter::new(&mut obkv_buffer); - // We retrieve the field id based on the CSV header position - // and zip it with the record value. - for (key, field) in fields_ids.iter().copied().zip(&record) { + // We extract the user id if we know where it is or generate an UUID V4 otherwise. + // TODO we must validate the user id (i.e. [a-zA-Z0-9\-_]). + let user_id = match user_id_pos { + Some(pos) => &record[pos], + None => uuid::Uuid::new_v4().to_hyphenated().encode_lower(&mut uuid_buffer), + }; + + // When the primary_key_field_id is found in the fields ids list + // we return the generated document id instead of the record field. + let iter = fields_ids.iter() + .map(|(fi, i)| { + let field = if *fi == primary_key_field_id { user_id } else { &record[*i] }; + (fi, field) + }); + + // We retrieve the field id based on the fields ids map fields ids order. + for (field_id, field) in iter { // We serialize the attribute values as JSON strings. json_buffer.clear(); serde_json::to_writer(&mut json_buffer, &field)?; - writer.insert(key, &json_buffer)?; + writer.insert(*field_id, &json_buffer)?; } - // We extract the user id and use it as the key for this document. - // TODO we must validate the user id (i.e. [a-zA-Z0-9\-_]). - let user_id = &record[user_id_pos]; + // We use the extracted/generated user id as the key for this document. sorter.insert(user_id, &obkv_buffer)?; } @@ -179,6 +219,7 @@ impl Transform<'_, '_> { } Ok(TransformOutput { + primary_key: primary_key_field_id, fields_ids_map, users_ids_documents_ids: users_ids_documents_ids_builder.into_map(), new_documents_ids,