add primary key and update documents

This commit is contained in:
mpostma 2021-02-13 12:22:59 +01:00
parent 3a634cb583
commit 1eaf28f823
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
5 changed files with 95 additions and 35 deletions

View File

@ -16,6 +16,7 @@ impl Data {
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat, format: UpdateFormat,
mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin, mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin,
primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus> ) -> anyhow::Result<UpdateStatus>
where where
B: Deref<Target = [u8]>, B: Deref<Target = [u8]>,
@ -47,7 +48,7 @@ impl Data {
mmap = unsafe { memmap::Mmap::map(&file)? }; mmap = unsafe { memmap::Mmap::map(&file)? };
&mmap &mmap
}; };
index_controller.add_documents(index, method, format, &bytes) index_controller.add_documents(index, method, format, &bytes, primary_key)
}).await??; }).await??;
Ok(update.into()) Ok(update.into())
} }

View File

@ -40,9 +40,10 @@ impl IndexController for LocalIndexController {
method: milli::update::IndexDocumentsMethod, method: milli::update::IndexDocumentsMethod,
format: milli::update::UpdateFormat, format: milli::update::UpdateFormat,
data: &[u8], data: &[u8],
primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> { ) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?; let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
let meta = UpdateMeta::DocumentsAddition { method, format }; let meta = UpdateMeta::DocumentsAddition { method, format, primary_key };
let pending = update_store.register_update(meta, data)?; let pending = update_store.register_update(meta, data)?;
Ok(pending.into()) Ok(pending.into())
} }

View File

@ -1,19 +1,19 @@
use std::collections::HashMap;
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap;
use anyhow::Result; use anyhow::Result;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use grenad::CompressionType; use grenad::CompressionType;
use log::info; use log::info;
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use milli::Index; use milli::Index;
use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod};
use rayon::ThreadPool; use rayon::ThreadPool;
use crate::index_controller::updates::{Processing, Processed, Failed};
use crate::index_controller::{UpdateResult, UpdateMeta, Settings, Facets};
use crate::option::IndexerOpts;
use super::update_store::HandleUpdate; use super::update_store::HandleUpdate;
use crate::index_controller::updates::{Failed, Processed, Processing};
use crate::index_controller::{Facets, Settings, UpdateMeta, UpdateResult};
use crate::option::IndexerOpts;
pub struct UpdateHandler { pub struct UpdateHandler {
index: Arc<Index>, index: Arc<Index>,
@ -70,9 +70,19 @@ impl UpdateHandler {
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
content: &[u8], content: &[u8],
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
primary_key: Option<&str>,
) -> anyhow::Result<UpdateResult> { ) -> anyhow::Result<UpdateResult> {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = self.index.write_txn()?; let mut wtxn = self.index.write_txn()?;
// Set the primary key if not set already, ignore if already set.
match (self.index.primary_key(&wtxn)?, primary_key) {
(None, Some(ref primary_key)) => {
self.index.put_primary_key(&mut wtxn, primary_key)?;
}
_ => (),
}
let mut builder = update_builder.index_documents(&mut wtxn, &self.index); let mut builder = update_builder.index_documents(&mut wtxn, &self.index);
builder.update_format(format); builder.update_format(format);
builder.index_documents_method(method); builder.index_documents_method(method);
@ -84,14 +94,16 @@ impl UpdateHandler {
Box::new(content) as Box<dyn io::Read> Box::new(content) as Box<dyn io::Read>
}; };
let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); let result = builder.execute(reader, |indexing_step, update_id| {
info!("update {}: {:?}", update_id, indexing_step)
});
match result { match result {
Ok(addition_result) => wtxn Ok(addition_result) => wtxn
.commit() .commit()
.and(Ok(UpdateResult::DocumentsAddition(addition_result))) .and(Ok(UpdateResult::DocumentsAddition(addition_result)))
.map_err(Into::into), .map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e.into()),
} }
} }
@ -105,11 +117,15 @@ impl UpdateHandler {
.commit() .commit()
.and(Ok(UpdateResult::Other)) .and(Ok(UpdateResult::Other))
.map_err(Into::into), .map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e.into()),
} }
} }
fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> anyhow::Result<UpdateResult> { fn update_settings(
&self,
settings: &Settings,
update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = self.index.write_txn()?; let mut wtxn = self.index.write_txn()?;
let mut builder = update_builder.settings(&mut wtxn, &self.index); let mut builder = update_builder.settings(&mut wtxn, &self.index);
@ -144,21 +160,22 @@ impl UpdateHandler {
} }
} }
let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); let result = builder
.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
match result { match result {
Ok(()) => wtxn Ok(()) => wtxn
.commit() .commit()
.and(Ok(UpdateResult::Other)) .and(Ok(UpdateResult::Other))
.map_err(Into::into), .map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e.into()),
} }
} }
fn update_facets( fn update_facets(
&self, &self,
levels: &Facets, levels: &Facets,
update_builder: UpdateBuilder update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> { ) -> anyhow::Result<UpdateResult> {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = self.index.write_txn()?; let mut wtxn = self.index.write_txn()?;
@ -174,7 +191,7 @@ impl UpdateHandler {
.commit() .commit()
.and(Ok(UpdateResult::Other)) .and(Ok(UpdateResult::Other))
.map_err(Into::into), .map_err(Into::into),
Err(e) => Err(e.into()) Err(e) => Err(e.into()),
} }
} }
@ -204,7 +221,7 @@ impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
fn handle_update( fn handle_update(
&mut self, &mut self,
meta: Processing<UpdateMeta>, meta: Processing<UpdateMeta>,
content: &[u8] content: &[u8],
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> { ) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
use UpdateMeta::*; use UpdateMeta::*;
@ -213,7 +230,17 @@ impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
let update_builder = self.update_buidler(update_id); let update_builder = self.update_buidler(update_id);
let result = match meta.meta() { let result = match meta.meta() {
DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), DocumentsAddition {
method,
format,
primary_key,
} => self.update_documents(
*format,
*method,
content,
update_builder,
primary_key.as_deref(),
),
ClearDocuments => self.clear_documents(update_builder), ClearDocuments => self.clear_documents(update_builder),
DeleteDocuments => self.delete_documents(content, update_builder), DeleteDocuments => self.delete_documents(content, update_builder),
Settings(settings) => self.update_settings(settings, update_builder), Settings(settings) => self.update_settings(settings, update_builder),

View File

@ -31,7 +31,11 @@ pub struct IndexMetadata {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum UpdateMeta { pub enum UpdateMeta {
DocumentsAddition { method: IndexDocumentsMethod, format: UpdateFormat }, DocumentsAddition {
method: IndexDocumentsMethod,
format: UpdateFormat,
primary_key: Option<String>,
},
ClearDocuments, ClearDocuments,
DeleteDocuments, DeleteDocuments,
Settings(Settings), Settings(Settings),
@ -128,6 +132,7 @@ pub trait IndexController {
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat, format: UpdateFormat,
data: &[u8], data: &[u8],
primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus>; ) -> anyhow::Result<UpdateStatus>;
/// Clear all documents in the given index. /// Clear all documents in the given index.

View File

@ -127,17 +127,7 @@ async fn get_all_documents(
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
struct UpdateDocumentsQuery { struct UpdateDocumentsQuery {
_primary_key: Option<String>, primary_key: Option<String>,
}
async fn update_multiple_documents(
_data: web::Data<Data>,
_path: web::Path<IndexParam>,
_params: web::Query<UpdateDocumentsQuery>,
_body: web::Json<Vec<Document>>,
_is_partial: bool,
) -> Result<HttpResponse, ResponseError> {
todo!()
} }
/// Route used when the payload type is "application/json" /// Route used when the payload type is "application/json"
@ -149,15 +139,16 @@ async fn update_multiple_documents(
async fn add_documents_json( async fn add_documents_json(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
_params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let addition_result = data let addition_result = data
.add_documents( .add_documents(
path.into_inner().index_uid, path.into_inner().index_uid,
IndexDocumentsMethod::UpdateDocuments, IndexDocumentsMethod::ReplaceDocuments,
UpdateFormat::Json, UpdateFormat::Json,
body body,
params.primary_key.clone(),
).await; ).await;
match addition_result { match addition_result {
@ -174,7 +165,7 @@ async fn add_documents_json(
} }
/// Default route for addign documents, this should return an error en redirect to the docuentation /// Default route for adding documents, this should return an error and redirect to the documentation
#[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] #[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")]
async fn add_documents_default( async fn add_documents_default(
_data: web::Data<Data>, _data: web::Data<Data>,
@ -186,14 +177,49 @@ async fn add_documents_default(
todo!() todo!()
} }
/// Default route for adding documents, this should return an error and redirect to the documentation
#[put("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] #[put("/indexes/{index_uid}/documents", wrap = "Authentication::Private")]
async fn update_documents_default(
_data: web::Data<Data>,
_path: web::Path<IndexParam>,
_params: web::Query<UpdateDocumentsQuery>,
_body: web::Json<Vec<Document>>,
) -> Result<HttpResponse, ResponseError> {
error!("Unknown document type");
todo!()
}
#[put(
"/indexes/{index_uid}/documents",
wrap = "Authentication::Private",
guard = "guard_json",
)]
async fn update_documents( async fn update_documents(
data: web::Data<Data>, data: web::Data<Data>,
path: web::Path<IndexParam>, path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: web::Json<Vec<Document>>, body: web::Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
update_multiple_documents(data, path, params, body, true).await let addition_result = data
.add_documents(
path.into_inner().index_uid,
IndexDocumentsMethod::UpdateDocuments,
UpdateFormat::Json,
body,
params.primary_key.clone(),
).await;
match addition_result {
Ok(update) => {
let value = serde_json::to_string(&update).unwrap();
let response = HttpResponse::Ok().body(value);
Ok(response)
}
Err(e) => {
error!("{}", e);
todo!()
}
}
} }
#[post( #[post(