Update the Transform struct to support JSON stream updates

This commit is contained in:
Clément Renault 2020-11-01 11:50:10 +01:00
parent 082ad84914
commit f0e63025b0
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 86 additions and 19 deletions

View File

@ -224,8 +224,9 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
match format.as_str() { match format.as_str() {
"json" => builder.update_format(UpdateFormat::Json),
"csv" => builder.update_format(UpdateFormat::Csv), "csv" => builder.update_format(UpdateFormat::Csv),
"json" => builder.update_format(UpdateFormat::Json),
"json-stream" => builder.update_format(UpdateFormat::JsonStream),
otherwise => panic!("invalid update format {:?}", otherwise), otherwise => panic!("invalid update format {:?}", otherwise),
}; };
@ -491,6 +492,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let format = match update_format { let format = match update_format {
UpdateFormat::Csv => String::from("csv"), UpdateFormat::Csv => String::from("csv"),
UpdateFormat::Json => String::from("json"), UpdateFormat::Json => String::from("json"),
UpdateFormat::JsonStream => String::from("json-stream"),
}; };
let meta = UpdateMeta::DocumentsAddition { method, format }; let meta = UpdateMeta::DocumentsAddition { method, format };
@ -540,6 +542,23 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
) )
}); });
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let indexing_route_json_stream = warp::filters::method::post()
.and(warp::path!("documents"))
.and(warp::header::exact_ignore_case("content-type", "application/x-ndjson"))
.and(warp::filters::query::query())
.and(warp::body::stream())
.and_then(move |params: QueryUpdate, stream| {
buf_stream(
update_store_cloned.clone(),
update_status_sender_cloned.clone(),
params.method,
UpdateFormat::JsonStream,
stream,
)
});
let update_status_sender_cloned = update_status_sender.clone(); let update_status_sender_cloned = update_status_sender.clone();
let clearing_route = warp::filters::method::post() let clearing_route = warp::filters::method::post()
.and(warp::path!("clear-documents")) .and(warp::path!("clear-documents"))
@ -595,6 +614,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
.or(query_route) .or(query_route)
.or(indexing_route_csv) .or(indexing_route_csv)
.or(indexing_route_json) .or(indexing_route_json)
.or(indexing_route_json_stream)
.or(clearing_route) .or(clearing_route)
.or(update_ws_route); .or(update_ws_route);

View File

@ -187,6 +187,8 @@ pub enum UpdateFormat {
Csv, Csv,
/// The given update is a JSON array with documents inside. /// The given update is a JSON array with documents inside.
Json, Json,
/// The given update is a JSON stream with a document on each line.
JsonStream,
} }
pub struct IndexDocuments<'t, 'u, 'i> { pub struct IndexDocuments<'t, 'u, 'i> {
@ -306,6 +308,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
let output = match self.update_format { let output = match self.update_format {
UpdateFormat::Csv => transform.from_csv(reader)?, UpdateFormat::Csv => transform.from_csv(reader)?,
UpdateFormat::Json => transform.from_json(reader)?, UpdateFormat::Json => transform.from_json(reader)?,
UpdateFormat::JsonStream => transform.from_json_stream(reader)?,
}; };
let TransformOutput { let TransformOutput {
@ -844,4 +847,30 @@ mod tests {
assert_eq!(count, 0); assert_eq!(count, 0);
drop(rtxn); drop(rtxn);
} }
#[test]
fn json_stream_documents() {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
// First we send 3 documents with an id for only one of them.
let mut wtxn = index.write_txn().unwrap();
let content = &br#"
{ "name": "kevin" }
{ "name": "kevina", "id": 21 }
{ "name": "benoit" }
"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::JsonStream);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3);
drop(rtxn);
}
} }

View File

@ -2,6 +2,7 @@ use std::borrow::Cow;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fs::File; use std::fs::File;
use std::io::{Read, Seek, SeekFrom}; use std::io::{Read, Seek, SeekFrom};
use std::iter::Peekable;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use fst::{IntoStreamer, Streamer}; use fst::{IntoStreamer, Streamer};
@ -24,6 +25,12 @@ pub struct TransformOutput {
pub documents_file: File, pub documents_file: File,
} }
/// Extract the users ids, deduplicate and compute the new internal documents ids
/// and fields ids, writing all the documents under their internal ids into a final file.
///
/// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids,
/// the replaced documents ids, the number of documents in this update and the file
/// containing all those documents.
pub struct Transform<'t, 'i> { pub struct Transform<'t, 'i> {
pub rtxn: &'t heed::RoTxn<'i>, pub rtxn: &'t heed::RoTxn<'i>,
pub index: &'i Index, pub index: &'i Index,
@ -37,26 +44,41 @@ pub struct Transform<'t, 'i> {
} }
impl Transform<'_, '_> { impl Transform<'_, '_> {
/// Extract the users ids, deduplicate and compute the new internal documents ids
/// and fields ids, writing all the documents under their internal ids into a final file.
///
/// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids,
/// the replaced documents ids, the number of documents in this update and the file
/// containing all those documents.
pub fn from_json<R: Read>(self, reader: R) -> anyhow::Result<TransformOutput> { pub fn from_json<R: Read>(self, reader: R) -> anyhow::Result<TransformOutput> {
self.from_generic_json(reader, false)
}
pub fn from_json_stream<R: Read>(self, reader: R) -> anyhow::Result<TransformOutput> {
self.from_generic_json(reader, true)
}
fn from_generic_json<R: Read>(self, reader: R, is_stream: bool) -> anyhow::Result<TransformOutput> {
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap();
let primary_key = self.index.primary_key(self.rtxn)?; let primary_key = self.index.primary_key(self.rtxn)?;
// Deserialize the whole batch of documents in memory. // Deserialize the whole batch of documents in memory.
let documents: Vec<Map<String, Value>> = serde_json::from_reader(reader)?; let mut documents: Peekable<Box<dyn Iterator<Item=serde_json::Result<Map<String, Value>>>>> = if is_stream {
let iter = serde_json::Deserializer::from_reader(reader).into_iter();
let iter = Box::new(iter) as Box<dyn Iterator<Item=_>>;
iter.peekable()
} else {
let vec: Vec<_> = serde_json::from_reader(reader)?;
let iter = vec.into_iter().map(Ok);
let iter = Box::new(iter) as Box<dyn Iterator<Item=_>>;
iter.peekable()
};
// We extract the primary key from the first document in // We extract the primary key from the first document in
// the batch if it hasn't already been defined in the index. // the batch if it hasn't already been defined in the index.
let primary_key = match primary_key { let primary_key = match primary_key {
Some(primary_key) => primary_key, Some(primary_key) => primary_key,
None => { None => {
match documents.get(0).and_then(|doc| doc.keys().find(|k| k.contains("id"))) { // We ignore a potential error here as we can't early return it now,
// the peek method gives us only a reference on the next item,
// we will eventually return it in the iteration just after.
let first = documents.peek().and_then(|r| r.as_ref().ok());
match first.and_then(|doc| doc.keys().find(|k| k.contains("id"))) {
Some(key) => fields_ids_map.insert(&key).context("field id limit reached")?, Some(key) => fields_ids_map.insert(&key).context("field id limit reached")?,
None => { None => {
if !self.autogenerate_docids { if !self.autogenerate_docids {
@ -70,7 +92,7 @@ impl Transform<'_, '_> {
}, },
}; };
if documents.is_empty() { if documents.peek().is_none() {
return Ok(TransformOutput { return Ok(TransformOutput {
primary_key, primary_key,
fields_ids_map, fields_ids_map,
@ -110,7 +132,9 @@ impl Transform<'_, '_> {
let mut obkv_buffer = Vec::new(); let mut obkv_buffer = Vec::new();
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
for mut document in documents { for result in documents {
let mut document = result?;
obkv_buffer.clear(); obkv_buffer.clear();
let mut writer = obkv::KvWriter::new(&mut obkv_buffer); let mut writer = obkv::KvWriter::new(&mut obkv_buffer);
@ -155,12 +179,6 @@ impl Transform<'_, '_> {
self.from_sorter(sorter, primary_key, fields_ids_map, users_ids_documents_ids) self.from_sorter(sorter, primary_key, fields_ids_map, users_ids_documents_ids)
} }
/// Extract the users ids, deduplicate and compute the new internal documents ids
/// and fields ids, writing all the documents under their internal ids into a final file.
///
/// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids,
/// the replaced documents ids, the number of documents in this update and the file
/// containing all those documents.
pub fn from_csv<R: Read>(self, reader: R) -> anyhow::Result<TransformOutput> { pub fn from_csv<R: Read>(self, reader: R) -> anyhow::Result<TransformOutput> {
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap();
@ -261,8 +279,8 @@ impl Transform<'_, '_> {
self.from_sorter(sorter, primary_key_field_id, fields_ids_map, users_ids_documents_ids) self.from_sorter(sorter, primary_key_field_id, fields_ids_map, users_ids_documents_ids)
} }
/// Generate the TransformOutput based on the given sorter that can be generated from any /// Generate the `TransformOutput` based on the given sorter that can be generated from any
/// format like CSV, JSON or JSON lines. This sorter must contain a key that is the document /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
/// id for the user side and the value must be an obkv where keys are valid fields ids. /// id for the user side and the value must be an obkv where keys are valid fields ids.
fn from_sorter( fn from_sorter(
self, self,