diff --git a/src/data/updates.rs b/src/data/updates.rs index 9fd2015f6..e2bcba9d2 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -16,6 +16,7 @@ impl Data { method: IndexDocumentsMethod, format: UpdateFormat, mut stream: impl futures::Stream> + Unpin, + primary_key: Option, ) -> anyhow::Result where B: Deref, @@ -47,7 +48,7 @@ impl Data { mmap = unsafe { memmap::Mmap::map(&file)? }; &mmap }; - index_controller.add_documents(index, method, format, &bytes) + index_controller.add_documents(index, method, format, &bytes, primary_key) }).await??; Ok(update.into()) } diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index c0903a5e1..186ed3ddd 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -40,9 +40,10 @@ impl IndexController for LocalIndexController { method: milli::update::IndexDocumentsMethod, format: milli::update::UpdateFormat, data: &[u8], + primary_key: Option, ) -> anyhow::Result> { let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?; - let meta = UpdateMeta::DocumentsAddition { method, format }; + let meta = UpdateMeta::DocumentsAddition { method, format, primary_key }; let pending = update_store.register_update(meta, data)?; Ok(pending.into()) } diff --git a/src/index_controller/local_index_controller/update_handler.rs b/src/index_controller/local_index_controller/update_handler.rs index 7b7487ffa..7102f6f50 100644 --- a/src/index_controller/local_index_controller/update_handler.rs +++ b/src/index_controller/local_index_controller/update_handler.rs @@ -1,19 +1,19 @@ +use std::collections::HashMap; use std::io; use std::sync::Arc; -use std::collections::HashMap; use anyhow::Result; use flate2::read::GzDecoder; use grenad::CompressionType; use log::info; +use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use milli::Index; -use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod}; use rayon::ThreadPool; -use crate::index_controller::updates::{Processing, Processed, Failed}; -use crate::index_controller::{UpdateResult, UpdateMeta, Settings, Facets}; -use crate::option::IndexerOpts; use super::update_store::HandleUpdate; +use crate::index_controller::updates::{Failed, Processed, Processing}; +use crate::index_controller::{Facets, Settings, UpdateMeta, UpdateResult}; +use crate::option::IndexerOpts; pub struct UpdateHandler { index: Arc, @@ -70,9 +70,19 @@ impl UpdateHandler { method: IndexDocumentsMethod, content: &[u8], update_builder: UpdateBuilder, + primary_key: Option<&str>, ) -> anyhow::Result { // We must use the write transaction of the update here. let mut wtxn = self.index.write_txn()?; + + // Set the primary key if not set already, ignore if already set. + match (self.index.primary_key(&wtxn)?, primary_key) { + (None, Some(ref primary_key)) => { + self.index.put_primary_key(&mut wtxn, primary_key)?; + } + _ => (), + } + let mut builder = update_builder.index_documents(&mut wtxn, &self.index); builder.update_format(format); builder.index_documents_method(method); @@ -84,14 +94,16 @@ impl UpdateHandler { Box::new(content) as Box }; - let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); + let result = builder.execute(reader, |indexing_step, update_id| { + info!("update {}: {:?}", update_id, indexing_step) + }); match result { Ok(addition_result) => wtxn .commit() .and(Ok(UpdateResult::DocumentsAddition(addition_result))) .map_err(Into::into), - Err(e) => Err(e.into()) + Err(e) => Err(e.into()), } } @@ -105,11 +117,15 @@ impl UpdateHandler { .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), - Err(e) => Err(e.into()) + Err(e) => Err(e.into()), } } - fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> anyhow::Result { + fn update_settings( + &self, + settings: &Settings, + update_builder: UpdateBuilder, + ) -> anyhow::Result { // We must use the write transaction of the update here. let mut wtxn = self.index.write_txn()?; let mut builder = update_builder.settings(&mut wtxn, &self.index); @@ -144,21 +160,22 @@ impl UpdateHandler { } } - let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); + let result = builder + .execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); match result { Ok(()) => wtxn .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), - Err(e) => Err(e.into()) + Err(e) => Err(e.into()), } } fn update_facets( &self, levels: &Facets, - update_builder: UpdateBuilder + update_builder: UpdateBuilder, ) -> anyhow::Result { // We must use the write transaction of the update here. let mut wtxn = self.index.write_txn()?; @@ -174,7 +191,7 @@ impl UpdateHandler { .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), - Err(e) => Err(e.into()) + Err(e) => Err(e.into()), } } @@ -204,7 +221,7 @@ impl HandleUpdate for UpdateHandler { fn handle_update( &mut self, meta: Processing, - content: &[u8] + content: &[u8], ) -> Result, Failed> { use UpdateMeta::*; @@ -213,7 +230,17 @@ impl HandleUpdate for UpdateHandler { let update_builder = self.update_buidler(update_id); let result = match meta.meta() { - DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), + DocumentsAddition { + method, + format, + primary_key, + } => self.update_documents( + *format, + *method, + content, + update_builder, + primary_key.as_deref(), + ), ClearDocuments => self.clear_documents(update_builder), DeleteDocuments => self.delete_documents(content, update_builder), Settings(settings) => self.update_settings(settings, update_builder), diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index b881e3268..5bdaf8737 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -31,7 +31,11 @@ pub struct IndexMetadata { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum UpdateMeta { - DocumentsAddition { method: IndexDocumentsMethod, format: UpdateFormat }, + DocumentsAddition { + method: IndexDocumentsMethod, + format: UpdateFormat, + primary_key: Option, + }, ClearDocuments, DeleteDocuments, Settings(Settings), @@ -128,6 +132,7 @@ pub trait IndexController { method: IndexDocumentsMethod, format: UpdateFormat, data: &[u8], + primary_key: Option, ) -> anyhow::Result; /// Clear all documents in the given index. diff --git a/src/routes/document.rs b/src/routes/document.rs index 9daaff6c8..0eb2bd463 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -127,17 +127,7 @@ async fn get_all_documents( #[derive(Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] struct UpdateDocumentsQuery { - _primary_key: Option, -} - -async fn update_multiple_documents( - _data: web::Data, - _path: web::Path, - _params: web::Query, - _body: web::Json>, - _is_partial: bool, -) -> Result { - todo!() + primary_key: Option, } /// Route used when the payload type is "application/json" @@ -149,15 +139,16 @@ async fn update_multiple_documents( async fn add_documents_json( data: web::Data, path: web::Path, - _params: web::Query, + params: web::Query, body: Payload, ) -> Result { let addition_result = data .add_documents( path.into_inner().index_uid, - IndexDocumentsMethod::UpdateDocuments, + IndexDocumentsMethod::ReplaceDocuments, UpdateFormat::Json, - body + body, + params.primary_key.clone(), ).await; match addition_result { @@ -174,7 +165,7 @@ async fn add_documents_json( } -/// Default route for addign documents, this should return an error en redirect to the docuentation +/// Default route for adding documents, this should return an error and redirect to the documentation #[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] async fn add_documents_default( _data: web::Data, @@ -186,14 +177,49 @@ async fn add_documents_default( todo!() } +/// Default route for adding documents, this should return an error and redirect to the documentation #[put("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] +async fn update_documents_default( + _data: web::Data, + _path: web::Path, + _params: web::Query, + _body: web::Json>, +) -> Result { + error!("Unknown document type"); + todo!() +} + +#[put( + "/indexes/{index_uid}/documents", + wrap = "Authentication::Private", + guard = "guard_json", +)] async fn update_documents( data: web::Data, path: web::Path, params: web::Query, - body: web::Json>, + body: web::Payload, ) -> Result { - update_multiple_documents(data, path, params, body, true).await + let addition_result = data + .add_documents( + path.into_inner().index_uid, + IndexDocumentsMethod::UpdateDocuments, + UpdateFormat::Json, + body, + params.primary_key.clone(), + ).await; + + match addition_result { + Ok(update) => { + let value = serde_json::to_string(&update).unwrap(); + let response = HttpResponse::Ok().body(value); + Ok(response) + } + Err(e) => { + error!("{}", e); + todo!() + } + } } #[post(