diff --git a/meilisearch-core/src/update/documents_addition.rs b/meilisearch-core/src/update/documents_addition.rs index 00fdd5122..26bbd94b2 100644 --- a/meilisearch-core/src/update/documents_addition.rs +++ b/meilisearch-core/src/update/documents_addition.rs @@ -23,6 +23,8 @@ pub struct DocumentsAddition { updates_store: store::Updates, updates_results_store: store::UpdatesResults, updates_notifier: UpdateEventsEmitter, + // Whether the user explicitly set the primary key in the update + primary_key: Option, documents: Vec, is_partial: bool, } @@ -39,6 +41,7 @@ impl DocumentsAddition { updates_notifier, documents: Vec::new(), is_partial: false, + primary_key: None, } } @@ -53,9 +56,14 @@ impl DocumentsAddition { updates_notifier, documents: Vec::new(), is_partial: true, + primary_key: None, } } + pub fn set_primary_key(&mut self, primary_key: String) { + self.primary_key = Some(primary_key); + } + pub fn update_document(&mut self, document: D) { self.documents.push(document); } @@ -71,6 +79,7 @@ impl DocumentsAddition { self.updates_results_store, self.documents, self.is_partial, + self.primary_key, )?; Ok(update_id) } @@ -88,6 +97,7 @@ pub fn push_documents_addition( updates_results_store: store::UpdatesResults, addition: Vec, is_partial: bool, + primary_key: Option, ) -> MResult { let mut values = Vec::with_capacity(addition.len()); for add in addition { @@ -99,9 +109,9 @@ pub fn push_documents_addition( let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; let update = if is_partial { - Update::documents_partial(values) + Update::documents_partial(primary_key, values) } else { - Update::documents_addition(values) + Update::documents_addition(primary_key, values) }; updates_store.put_update(writer, last_update_id, &update)?; @@ -149,7 +159,8 @@ pub fn apply_addition( writer: &mut heed::RwTxn, index: &store::Index, new_documents: Vec>, - partial: bool + partial: bool, + primary_key: Option, ) -> MResult<()> { let mut schema = match index.main.schema(writer)? { @@ -162,7 +173,14 @@ pub fn apply_addition( let internal_docids = index.main.internal_docids(writer)?; let mut available_ids = DiscoverIds::new(&internal_docids); - let primary_key = schema.primary_key().ok_or(Error::MissingPrimaryKey)?; + let primary_key = match schema.primary_key() { + Some(primary_key) => primary_key.to_string(), + None => { + let name = primary_key.ok_or(Error::MissingPrimaryKey)?; + schema.set_primary_key(&name)?; + name + } + }; // 1. store documents ids for future deletion let mut documents_additions = HashMap::new(); @@ -275,16 +293,18 @@ pub fn apply_documents_partial_addition( writer: &mut heed::RwTxn, index: &store::Index, new_documents: Vec>, + primary_key: Option, ) -> MResult<()> { - apply_addition(writer, index, new_documents, true) + apply_addition(writer, index, new_documents, true, primary_key) } pub fn apply_documents_addition( writer: &mut heed::RwTxn, index: &store::Index, new_documents: Vec>, + primary_key: Option, ) -> MResult<()> { - apply_addition(writer, index, new_documents, false) + apply_addition(writer, index, new_documents, false, primary_key) } pub fn reindex_all_documents(writer: &mut heed::RwTxn, index: &store::Index) -> MResult<()> { diff --git a/meilisearch-core/src/update/mod.rs b/meilisearch-core/src/update/mod.rs index d10f484a4..bcc03ec3f 100644 --- a/meilisearch-core/src/update/mod.rs +++ b/meilisearch-core/src/update/mod.rs @@ -52,16 +52,16 @@ impl Update { } } - fn documents_addition(documents: Vec>) -> Update { + fn documents_addition(primary_key: Option, documents: Vec>) -> Update { Update { - data: UpdateData::DocumentsAddition(documents), + data: UpdateData::DocumentsAddition{ documents, primary_key }, enqueued_at: Utc::now(), } } - fn documents_partial(documents: Vec>) -> Update { + fn documents_partial(primary_key: Option, documents: Vec>) -> Update { Update { - data: UpdateData::DocumentsPartial(documents), + data: UpdateData::DocumentsPartial{ documents, primary_key }, enqueued_at: Utc::now(), } } @@ -85,8 +85,15 @@ impl Update { pub enum UpdateData { ClearAll, Customs(Vec), - DocumentsAddition(Vec>), - DocumentsPartial(Vec>), + // (primary key, documents) + DocumentsAddition { + primary_key: Option, + documents: Vec> + }, + DocumentsPartial { + primary_key: Option, + documents: Vec>, + }, DocumentsDeletion(Vec), Settings(Box) } @@ -96,11 +103,11 @@ impl UpdateData { match self { UpdateData::ClearAll => UpdateType::ClearAll, UpdateData::Customs(_) => UpdateType::Customs, - UpdateData::DocumentsAddition(addition) => UpdateType::DocumentsAddition { - number: addition.len(), + UpdateData::DocumentsAddition{ documents, .. } => UpdateType::DocumentsAddition { + number: documents.len(), }, - UpdateData::DocumentsPartial(addition) => UpdateType::DocumentsPartial { - number: addition.len(), + UpdateData::DocumentsPartial{ documents, .. } => UpdateType::DocumentsPartial { + number: documents.len(), }, UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion { number: deletion.len(), @@ -239,25 +246,25 @@ pub fn update_task( (update_type, result, start.elapsed()) } - UpdateData::DocumentsAddition(documents) => { + UpdateData::DocumentsAddition { documents, primary_key } => { let start = Instant::now(); let update_type = UpdateType::DocumentsAddition { number: documents.len(), }; - let result = apply_documents_addition(writer, index, documents); + let result = apply_documents_addition(writer, index, documents, primary_key); (update_type, result, start.elapsed()) } - UpdateData::DocumentsPartial(documents) => { + UpdateData::DocumentsPartial{ documents, primary_key } => { let start = Instant::now(); let update_type = UpdateType::DocumentsPartial { number: documents.len(), }; - let result = apply_documents_partial_addition(writer, index, documents); + let result = apply_documents_partial_addition(writer, index, documents, primary_key); (update_type, result, start.elapsed()) } diff --git a/meilisearch-http/src/dump.rs b/meilisearch-http/src/dump.rs index c4513af6f..bf5752830 100644 --- a/meilisearch-http/src/dump.rs +++ b/meilisearch-http/src/dump.rs @@ -128,15 +128,15 @@ fn import_index_v1( // push document in buffer values.push(document?); // if buffer is full, create and apply a batch, and clean buffer - if values.len() == document_batch_size { + if values.len() == document_batch_size { let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size)); - apply_documents_addition(write_txn, &index, batch)?; + apply_documents_addition(write_txn, &index, batch, None)?; } } - // apply documents remaining in the buffer - if !values.is_empty() { - apply_documents_addition(write_txn, &index, values)?; + // apply documents remaining in the buffer + if !values.is_empty() { + apply_documents_addition(write_txn, &index, values, None)?; } // sync index information: stats, updated_at, last_update @@ -289,7 +289,6 @@ fn dump_index_documents(data: &web::Data, reader: &MainReader, dir_path: & /// Write error with a context. fn fail_dump_process(data: &web::Data, dump_info: DumpInfo, context: &str, error: E) { let error_message = format!("{}; {}", context, error); - error!("Something went wrong during dump process: {}", &error_message); data.set_current_dump_info(dump_info.with_error(Error::dump_failed(error_message).into())) } @@ -405,7 +404,7 @@ pub fn init_dump_process(data: &web::Data, dumps_dir: &Path) -> Result) -> Option { - for key in document.keys() { - if key.to_lowercase().contains("id") { - return Some(key.to_string()); - } - } - None -} - #[derive(Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] struct UpdateDocumentsQuery { @@ -168,26 +159,6 @@ async fn update_multiple_documents( is_partial: bool, ) -> Result { let update_id = data.get_or_create_index(&path.index_uid, |index| { - let reader = data.db.main_read_txn()?; - - let mut schema = index - .main - .schema(&reader)? - .ok_or(meilisearch_core::Error::SchemaMissing)?; - - if schema.primary_key().is_none() { - let id = match ¶ms.primary_key { - Some(id) => id.to_string(), - None => body - .first() - .and_then(find_primary_key) - .ok_or(meilisearch_core::Error::MissingPrimaryKey)?, - }; - - schema.set_primary_key(&id).map_err(Error::bad_request)?; - - data.db.main_write(|w| index.main.put_schema(w, &schema))?; - } let mut document_addition = if is_partial { index.documents_partial_addition() @@ -195,6 +166,27 @@ async fn update_multiple_documents( index.documents_addition() }; + // Return an early error if primary key is already set, otherwise, try to set it up in the + // update later. + let reader = data.db.main_read_txn()?; + let schema = index + .main + .schema(&reader)? + .ok_or(meilisearch_core::Error::SchemaMissing)?; + + match (params.into_inner().primary_key, schema.primary_key()) { + (Some(_), Some(_)) => return Err(meilisearch_schema::Error::PrimaryKeyAlreadyPresent)?, + (Some(key), None) => document_addition.set_primary_key(key), + (None, None) => { + let key = body + .first() + .and_then(find_primary_key) + .ok_or(meilisearch_core::Error::MissingPrimaryKey)?; + document_addition.set_primary_key(key); + } + (None, Some(_)) => () + } + for document in body.into_inner() { document_addition.update_document(document); } @@ -204,6 +196,15 @@ async fn update_multiple_documents( return Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id))); } +fn find_primary_key(document: &IndexMap) -> Option { + for key in document.keys() { + if key.to_lowercase().contains("id") { + return Some(key.to_string()); + } + } + None +} + #[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] async fn add_documents( data: web::Data, diff --git a/meilisearch-http/tests/index_update.rs b/meilisearch-http/tests/index_update.rs index df4639252..4d7e025a6 100644 --- a/meilisearch-http/tests/index_update.rs +++ b/meilisearch-http/tests/index_update.rs @@ -94,13 +94,21 @@ async fn return_update_status_of_pushed_documents() { ]; let mut update_ids = Vec::new(); - + let mut bodies = bodies.into_iter(); + let url = "/indexes/test/documents?primaryKey=title"; + let (response, status_code) = server.post_request(&url, bodies.next().unwrap()).await; + assert_eq!(status_code, 202); + let update_id = response["updateId"].as_u64().unwrap(); + update_ids.push(update_id); + server.wait_update_id(update_id).await; + + let url = "/indexes/test/documents"; for body in bodies { - let (response, status_code) = server.post_request(&url, body).await; - assert_eq!(status_code, 202); - let update_id = response["updateId"].as_u64().unwrap(); - update_ids.push(update_id); + let (response, status_code) = server.post_request(&url, body).await; + assert_eq!(status_code, 202); + let update_id = response["updateId"].as_u64().unwrap(); + update_ids.push(update_id); } // 2. Fetch the status of index. @@ -173,7 +181,7 @@ async fn should_return_existing_update() { let (response, status_code) = server.create_index(body).await; assert_eq!(status_code, 201); assert_eq!(response["primaryKey"], json!(null)); - + let body = json!([{ "title": "Test", "comment": "comment test"