data add documents

This commit is contained in:
mpostma 2020-12-23 13:52:28 +01:00
parent 0d7c4beecd
commit 1a38bfd31f
5 changed files with 100 additions and 25 deletions

View file

@ -1,11 +1,15 @@
use std::ops::Deref;
use std::sync::Arc;
use sha2::Digest;
use async_compression::tokio_02::write::GzipEncoder;
use futures_util::stream::StreamExt;
use tokio::io::AsyncWriteExt;
use milli::Index;
use milli::update::{IndexDocumentsMethod, UpdateFormat};
use sha2::Digest;
use crate::option::Opt;
use crate::updates::UpdateQueue;
use crate::updates::{UpdateQueue, UpdateMeta, UpdateStatus, UpdateMetaProgress};
#[derive(Clone)]
pub struct Data {
@ -75,11 +79,43 @@ impl Data {
Ok(Data { inner })
}
pub async fn add_documents<B, E>(
&self,
method: IndexDocumentsMethod,
format: UpdateFormat,
mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin,
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>
where
B: Deref<Target = [u8]>,
E: std::error::Error + Send + Sync + 'static,
{
let file = tokio::task::block_in_place(tempfile::tempfile)?;
let file = tokio::fs::File::from_std(file);
let mut encoder = GzipEncoder::new(file);
while let Some(result) = stream.next().await {
let bytes = &*result?;
encoder.write_all(&bytes[..]).await?;
}
encoder.shutdown().await?;
let mut file = encoder.into_inner();
file.sync_all().await?;
let file = file.into_std().await;
let mmap = unsafe { memmap::Mmap::map(&file)? };
let meta = UpdateMeta::DocumentsAddition { method, format };
let update_id = tokio::task::block_in_place(|| self.update_queue.register_update(&meta, &mmap[..]))?;
Ok(UpdateStatus::Pending { update_id, meta })
}
#[inline]
pub fn http_payload_size_limit(&self) -> usize {
self.options.http_payload_size_limit.get_bytes() as usize
}
#[inline]
pub fn api_keys(&self) -> &ApiKeys {
&self.api_keys
}

View file

@ -93,10 +93,10 @@ async fn update_multiple_documents(
#[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")]
async fn add_documents(
_data: web::Data<Data>,
data: web::Data<Data>,
_path: web::Path<IndexParam>,
_params: web::Query<UpdateDocumentsQuery>,
_body: web::Json<Vec<Document>>,
body: web::Json<Vec<Document>>,
) -> Result<HttpResponse, ResponseError> {
todo!()
}

View file

@ -4,6 +4,7 @@ pub use settings::{Settings, Facets};
use std::io;
use std::sync::Arc;
use std::ops::Deref;
use anyhow::Result;
use flate2::read::GzDecoder;
@ -20,8 +21,8 @@ use crate::option::Opt;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum UpdateMeta {
DocumentsAddition { method: String, format: String },
pub enum UpdateMeta {
DocumentsAddition { method: IndexDocumentsMethod, format: UpdateFormat },
ClearDocuments,
Settings(Settings),
Facets(Facets),
@ -29,7 +30,7 @@ enum UpdateMeta {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum UpdateMetaProgress {
pub enum UpdateMetaProgress {
DocumentsAddition {
step: usize,
total_steps: usize,
@ -41,7 +42,7 @@ enum UpdateMetaProgress {
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
#[allow(dead_code)]
enum UpdateStatus<M, P, N> {
pub enum UpdateStatus<M, P, N> {
Pending { update_id: u64, meta: M },
Progressing { update_id: u64, meta: P },
Processed { update_id: u64, meta: N },
@ -53,6 +54,13 @@ pub struct UpdateQueue {
inner: Arc<UpdateStore<UpdateMeta, String>>,
}
impl Deref for UpdateQueue {
type Target = Arc<UpdateStore<UpdateMeta, String>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug, Clone, StructOpt)]
pub struct IndexerOpts {
@ -164,27 +172,16 @@ impl UpdateHandler {
fn update_documents(
&self,
format: String,
method: String,
format: UpdateFormat,
method: IndexDocumentsMethod,
content: &[u8],
update_builder: UpdateBuilder,
) -> Result<()> {
// We must use the write transaction of the update here.
let mut wtxn = self.indexes.write_txn()?;
let mut builder = update_builder.index_documents(&mut wtxn, &self.indexes);
match format.as_str() {
"csv" => builder.update_format(UpdateFormat::Csv),
"json" => builder.update_format(UpdateFormat::Json),
"json-stream" => builder.update_format(UpdateFormat::JsonStream),
otherwise => panic!("invalid update format {:?}", otherwise),
};
match method.as_str() {
"replace" => builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments),
"update" => builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments),
otherwise => panic!("invalid indexing method {:?}", otherwise),
};
builder.update_format(format);
builder.index_documents_method(method);
let gzipped = true;
let reader = if gzipped {