Serialize the CSV record values as JSON strings

This commit is contained in:
Clément Renault 2020-10-24 14:02:29 +02:00
parent 656a851830
commit b44b04d25b
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
3 changed files with 18 additions and 14 deletions

View file

@ -1,12 +1,12 @@
use std::borrow::Cow;
use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;
use std::fs::File;
use std::io::{Read, Write, Seek, SeekFrom};
use std::io::{Read, Seek, SeekFrom};
use anyhow::{bail, Context};
use anyhow::Context;
use crate::{FieldsIdsMap, AvailableDocumentsIds};
use fst::{IntoStreamer, Streamer};
use grenad::{Writer, Sorter, Reader, CompressionType};
use grenad::{Writer, Sorter, CompressionType};
use roaring::RoaringBitmap;
pub struct TransformOutput {
@ -49,7 +49,7 @@ impl<A: AsRef<[u8]>> Transform<A> {
/// The last value associated with an id is kept.
fn merge_last_win(_key: &[u8], vals: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
Ok(vals.last().unwrap().clone().into_owned())
vals.last().context("no last value").map(|last| last.clone().into_owned())
}
// We initialize the sorter with the user indexing settings.
@ -63,23 +63,27 @@ impl<A: AsRef<[u8]>> Transform<A> {
// We write into the sorter to merge and deduplicate the documents
// based on the users ids.
let mut sorter = sorter_builder.build();
let mut buffer = Vec::new();
let mut json_buffer = Vec::new();
let mut obkv_buffer = Vec::new();
let mut record = csv::StringRecord::new();
while csv.read_record(&mut record)? {
buffer.clear();
let mut writer = obkv::KvWriter::new(&mut buffer);
obkv_buffer.clear();
let mut writer = obkv::KvWriter::new(&mut obkv_buffer);
// We retrieve the field id based on the CSV header position
// and zip it with the record value.
for (key, field) in fields_ids.iter().copied().zip(&record) {
// TODO we must serialize the values as JSON strings.
writer.insert(key, field)?;
// We serialize the attribute values as JSON strings.
json_buffer.clear();
serde_json::to_writer(&mut json_buffer, &field)?;
writer.insert(key, &json_buffer)?;
}
// We extract the user id and use it as the key for this document.
// TODO we must validate the user id (i.e. [a-zA-Z0-9\-_]).
let user_id = &record[user_id_pos];
sorter.insert(user_id, &buffer)?;
sorter.insert(user_id, &obkv_buffer)?;
}
// Once we have sort and deduplicated the documents we write them into a final file.
@ -129,7 +133,7 @@ impl<A: AsRef<[u8]>> Transform<A> {
// We create the union between the existing users ids documents ids with the new ones.
let new_users_ids_documents_ids = new_users_ids_documents_ids_builder.into_map();
let mut union_ = fst::map::OpBuilder::new()
let union_ = fst::map::OpBuilder::new()
.add(&self.users_ids_documents_ids)
.add(&new_users_ids_documents_ids)
.r#union();