1176: fix race condition in  document addition r=Kerollmops a=MarinPostma

As described in #1160, there was a race condition when updating settings and adding documents simultaneously. This was due to the schema being updated and document addition being processed in two different transactions. This PR moves the schema update logic for the primary key in the same transaction as the document addition, while maintaining the input checks for the validity of the primary key in the http route, in order not to break the error reporting for the document addition route.

close #1160.

Co-authored-by: mpostma <postma.marin@protonmail.com>
Co-authored-by: marin <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2021-02-02 09:26:32 +00:00 committed by GitHub
commit c984fa1071
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 98 additions and 63 deletions

View File

@ -23,6 +23,8 @@ pub struct DocumentsAddition<D> {
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: UpdateEventsEmitter, updates_notifier: UpdateEventsEmitter,
// Whether the user explicitly set the primary key in the update
primary_key: Option<String>,
documents: Vec<D>, documents: Vec<D>,
is_partial: bool, is_partial: bool,
} }
@ -39,6 +41,7 @@ impl<D> DocumentsAddition<D> {
updates_notifier, updates_notifier,
documents: Vec::new(), documents: Vec::new(),
is_partial: false, is_partial: false,
primary_key: None,
} }
} }
@ -53,9 +56,14 @@ impl<D> DocumentsAddition<D> {
updates_notifier, updates_notifier,
documents: Vec::new(), documents: Vec::new(),
is_partial: true, is_partial: true,
primary_key: None,
} }
} }
pub fn set_primary_key(&mut self, primary_key: String) {
self.primary_key = Some(primary_key);
}
pub fn update_document(&mut self, document: D) { pub fn update_document(&mut self, document: D) {
self.documents.push(document); self.documents.push(document);
} }
@ -71,6 +79,7 @@ impl<D> DocumentsAddition<D> {
self.updates_results_store, self.updates_results_store,
self.documents, self.documents,
self.is_partial, self.is_partial,
self.primary_key,
)?; )?;
Ok(update_id) Ok(update_id)
} }
@ -88,6 +97,7 @@ pub fn push_documents_addition<D: serde::Serialize>(
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
addition: Vec<D>, addition: Vec<D>,
is_partial: bool, is_partial: bool,
primary_key: Option<String>,
) -> MResult<u64> { ) -> MResult<u64> {
let mut values = Vec::with_capacity(addition.len()); let mut values = Vec::with_capacity(addition.len());
for add in addition { for add in addition {
@ -99,9 +109,9 @@ pub fn push_documents_addition<D: serde::Serialize>(
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?; let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = if is_partial { let update = if is_partial {
Update::documents_partial(values) Update::documents_partial(primary_key, values)
} else { } else {
Update::documents_addition(values) Update::documents_addition(primary_key, values)
}; };
updates_store.put_update(writer, last_update_id, &update)?; updates_store.put_update(writer, last_update_id, &update)?;
@ -149,7 +159,8 @@ pub fn apply_addition(
writer: &mut heed::RwTxn<MainT>, writer: &mut heed::RwTxn<MainT>,
index: &store::Index, index: &store::Index,
new_documents: Vec<IndexMap<String, Value>>, new_documents: Vec<IndexMap<String, Value>>,
partial: bool partial: bool,
primary_key: Option<String>,
) -> MResult<()> ) -> MResult<()>
{ {
let mut schema = match index.main.schema(writer)? { let mut schema = match index.main.schema(writer)? {
@ -162,7 +173,14 @@ pub fn apply_addition(
let internal_docids = index.main.internal_docids(writer)?; let internal_docids = index.main.internal_docids(writer)?;
let mut available_ids = DiscoverIds::new(&internal_docids); let mut available_ids = DiscoverIds::new(&internal_docids);
let primary_key = schema.primary_key().ok_or(Error::MissingPrimaryKey)?; let primary_key = match schema.primary_key() {
Some(primary_key) => primary_key.to_string(),
None => {
let name = primary_key.ok_or(Error::MissingPrimaryKey)?;
schema.set_primary_key(&name)?;
name
}
};
// 1. store documents ids for future deletion // 1. store documents ids for future deletion
let mut documents_additions = HashMap::new(); let mut documents_additions = HashMap::new();
@ -275,16 +293,18 @@ pub fn apply_documents_partial_addition(
writer: &mut heed::RwTxn<MainT>, writer: &mut heed::RwTxn<MainT>,
index: &store::Index, index: &store::Index,
new_documents: Vec<IndexMap<String, Value>>, new_documents: Vec<IndexMap<String, Value>>,
primary_key: Option<String>,
) -> MResult<()> { ) -> MResult<()> {
apply_addition(writer, index, new_documents, true) apply_addition(writer, index, new_documents, true, primary_key)
} }
pub fn apply_documents_addition( pub fn apply_documents_addition(
writer: &mut heed::RwTxn<MainT>, writer: &mut heed::RwTxn<MainT>,
index: &store::Index, index: &store::Index,
new_documents: Vec<IndexMap<String, Value>>, new_documents: Vec<IndexMap<String, Value>>,
primary_key: Option<String>,
) -> MResult<()> { ) -> MResult<()> {
apply_addition(writer, index, new_documents, false) apply_addition(writer, index, new_documents, false, primary_key)
} }
pub fn reindex_all_documents(writer: &mut heed::RwTxn<MainT>, index: &store::Index) -> MResult<()> { pub fn reindex_all_documents(writer: &mut heed::RwTxn<MainT>, index: &store::Index) -> MResult<()> {

View File

@ -52,16 +52,16 @@ impl Update {
} }
} }
fn documents_addition(documents: Vec<IndexMap<String, Value>>) -> Update { fn documents_addition(primary_key: Option<String>, documents: Vec<IndexMap<String, Value>>) -> Update {
Update { Update {
data: UpdateData::DocumentsAddition(documents), data: UpdateData::DocumentsAddition{ documents, primary_key },
enqueued_at: Utc::now(), enqueued_at: Utc::now(),
} }
} }
fn documents_partial(documents: Vec<IndexMap<String, Value>>) -> Update { fn documents_partial(primary_key: Option<String>, documents: Vec<IndexMap<String, Value>>) -> Update {
Update { Update {
data: UpdateData::DocumentsPartial(documents), data: UpdateData::DocumentsPartial{ documents, primary_key },
enqueued_at: Utc::now(), enqueued_at: Utc::now(),
} }
} }
@ -85,8 +85,15 @@ impl Update {
pub enum UpdateData { pub enum UpdateData {
ClearAll, ClearAll,
Customs(Vec<u8>), Customs(Vec<u8>),
DocumentsAddition(Vec<IndexMap<String, Value>>), // (primary key, documents)
DocumentsPartial(Vec<IndexMap<String, Value>>), DocumentsAddition {
primary_key: Option<String>,
documents: Vec<IndexMap<String, Value>>
},
DocumentsPartial {
primary_key: Option<String>,
documents: Vec<IndexMap<String, Value>>,
},
DocumentsDeletion(Vec<String>), DocumentsDeletion(Vec<String>),
Settings(Box<SettingsUpdate>) Settings(Box<SettingsUpdate>)
} }
@ -96,11 +103,11 @@ impl UpdateData {
match self { match self {
UpdateData::ClearAll => UpdateType::ClearAll, UpdateData::ClearAll => UpdateType::ClearAll,
UpdateData::Customs(_) => UpdateType::Customs, UpdateData::Customs(_) => UpdateType::Customs,
UpdateData::DocumentsAddition(addition) => UpdateType::DocumentsAddition { UpdateData::DocumentsAddition{ documents, .. } => UpdateType::DocumentsAddition {
number: addition.len(), number: documents.len(),
}, },
UpdateData::DocumentsPartial(addition) => UpdateType::DocumentsPartial { UpdateData::DocumentsPartial{ documents, .. } => UpdateType::DocumentsPartial {
number: addition.len(), number: documents.len(),
}, },
UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion { UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
number: deletion.len(), number: deletion.len(),
@ -239,25 +246,25 @@ pub fn update_task(
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
} }
UpdateData::DocumentsAddition(documents) => { UpdateData::DocumentsAddition { documents, primary_key } => {
let start = Instant::now(); let start = Instant::now();
let update_type = UpdateType::DocumentsAddition { let update_type = UpdateType::DocumentsAddition {
number: documents.len(), number: documents.len(),
}; };
let result = apply_documents_addition(writer, index, documents); let result = apply_documents_addition(writer, index, documents, primary_key);
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
} }
UpdateData::DocumentsPartial(documents) => { UpdateData::DocumentsPartial{ documents, primary_key } => {
let start = Instant::now(); let start = Instant::now();
let update_type = UpdateType::DocumentsPartial { let update_type = UpdateType::DocumentsPartial {
number: documents.len(), number: documents.len(),
}; };
let result = apply_documents_partial_addition(writer, index, documents); let result = apply_documents_partial_addition(writer, index, documents, primary_key);
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
} }

View File

@ -128,15 +128,15 @@ fn import_index_v1(
// push document in buffer // push document in buffer
values.push(document?); values.push(document?);
// if buffer is full, create and apply a batch, and clean buffer // if buffer is full, create and apply a batch, and clean buffer
if values.len() == document_batch_size { if values.len() == document_batch_size {
let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size)); let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size));
apply_documents_addition(write_txn, &index, batch)?; apply_documents_addition(write_txn, &index, batch, None)?;
} }
} }
// apply documents remaining in the buffer // apply documents remaining in the buffer
if !values.is_empty() { if !values.is_empty() {
apply_documents_addition(write_txn, &index, values)?; apply_documents_addition(write_txn, &index, values, None)?;
} }
// sync index information: stats, updated_at, last_update // sync index information: stats, updated_at, last_update
@ -289,7 +289,6 @@ fn dump_index_documents(data: &web::Data<Data>, reader: &MainReader, dir_path: &
/// Write error with a context. /// Write error with a context.
fn fail_dump_process<E: std::error::Error>(data: &web::Data<Data>, dump_info: DumpInfo, context: &str, error: E) { fn fail_dump_process<E: std::error::Error>(data: &web::Data<Data>, dump_info: DumpInfo, context: &str, error: E) {
let error_message = format!("{}; {}", context, error); let error_message = format!("{}; {}", context, error);
error!("Something went wrong during dump process: {}", &error_message); error!("Something went wrong during dump process: {}", &error_message);
data.set_current_dump_info(dump_info.with_error(Error::dump_failed(error_message).into())) data.set_current_dump_info(dump_info.with_error(Error::dump_failed(error_message).into()))
} }
@ -405,7 +404,7 @@ pub fn init_dump_process(data: &web::Data<Data>, dumps_dir: &Path) -> Result<Dum
let dumps_dir = dumps_dir.to_path_buf(); let dumps_dir = dumps_dir.to_path_buf();
let info_cloned = info.clone(); let info_cloned = info.clone();
// run dump process in a new thread // run dump process in a new thread
thread::spawn(move || thread::spawn(move ||
dump_process(data, dumps_dir, info_cloned) dump_process(data, dumps_dir, info_cloned)
); );

View File

@ -132,7 +132,7 @@ async fn get_all_documents(
let limit = params.limit.unwrap_or(20); let limit = params.limit.unwrap_or(20);
let index_uid = &path.index_uid; let index_uid = &path.index_uid;
let reader = data.db.main_read_txn()?; let reader = data.db.main_read_txn()?;
let documents = get_all_documents_sync( let documents = get_all_documents_sync(
&data, &data,
&reader, &reader,
@ -145,15 +145,6 @@ async fn get_all_documents(
Ok(HttpResponse::Ok().json(documents)) Ok(HttpResponse::Ok().json(documents))
} }
fn find_primary_key(document: &IndexMap<String, Value>) -> Option<String> {
for key in document.keys() {
if key.to_lowercase().contains("id") {
return Some(key.to_string());
}
}
None
}
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
struct UpdateDocumentsQuery { struct UpdateDocumentsQuery {
@ -168,26 +159,6 @@ async fn update_multiple_documents(
is_partial: bool, is_partial: bool,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let update_id = data.get_or_create_index(&path.index_uid, |index| { let update_id = data.get_or_create_index(&path.index_uid, |index| {
let reader = data.db.main_read_txn()?;
let mut schema = index
.main
.schema(&reader)?
.ok_or(meilisearch_core::Error::SchemaMissing)?;
if schema.primary_key().is_none() {
let id = match &params.primary_key {
Some(id) => id.to_string(),
None => body
.first()
.and_then(find_primary_key)
.ok_or(meilisearch_core::Error::MissingPrimaryKey)?,
};
schema.set_primary_key(&id).map_err(Error::bad_request)?;
data.db.main_write(|w| index.main.put_schema(w, &schema))?;
}
let mut document_addition = if is_partial { let mut document_addition = if is_partial {
index.documents_partial_addition() index.documents_partial_addition()
@ -195,6 +166,27 @@ async fn update_multiple_documents(
index.documents_addition() index.documents_addition()
}; };
// Return an early error if primary key is already set, otherwise, try to set it up in the
// update later.
let reader = data.db.main_read_txn()?;
let schema = index
.main
.schema(&reader)?
.ok_or(meilisearch_core::Error::SchemaMissing)?;
match (params.into_inner().primary_key, schema.primary_key()) {
(Some(_), Some(_)) => return Err(meilisearch_schema::Error::PrimaryKeyAlreadyPresent)?,
(Some(key), None) => document_addition.set_primary_key(key),
(None, None) => {
let key = body
.first()
.and_then(find_primary_key)
.ok_or(meilisearch_core::Error::MissingPrimaryKey)?;
document_addition.set_primary_key(key);
}
(None, Some(_)) => ()
}
for document in body.into_inner() { for document in body.into_inner() {
document_addition.update_document(document); document_addition.update_document(document);
} }
@ -204,6 +196,15 @@ async fn update_multiple_documents(
return Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id))); return Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id)));
} }
fn find_primary_key(document: &IndexMap<String, Value>) -> Option<String> {
for key in document.keys() {
if key.to_lowercase().contains("id") {
return Some(key.to_string());
}
}
None
}
#[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] #[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")]
async fn add_documents( async fn add_documents(
data: web::Data<Data>, data: web::Data<Data>,

View File

@ -94,13 +94,21 @@ async fn return_update_status_of_pushed_documents() {
]; ];
let mut update_ids = Vec::new(); let mut update_ids = Vec::new();
let mut bodies = bodies.into_iter();
let url = "/indexes/test/documents?primaryKey=title"; let url = "/indexes/test/documents?primaryKey=title";
let (response, status_code) = server.post_request(&url, bodies.next().unwrap()).await;
assert_eq!(status_code, 202);
let update_id = response["updateId"].as_u64().unwrap();
update_ids.push(update_id);
server.wait_update_id(update_id).await;
let url = "/indexes/test/documents";
for body in bodies { for body in bodies {
let (response, status_code) = server.post_request(&url, body).await; let (response, status_code) = server.post_request(&url, body).await;
assert_eq!(status_code, 202); assert_eq!(status_code, 202);
let update_id = response["updateId"].as_u64().unwrap(); let update_id = response["updateId"].as_u64().unwrap();
update_ids.push(update_id); update_ids.push(update_id);
} }
// 2. Fetch the status of index. // 2. Fetch the status of index.
@ -173,7 +181,7 @@ async fn should_return_existing_update() {
let (response, status_code) = server.create_index(body).await; let (response, status_code) = server.create_index(body).await;
assert_eq!(status_code, 201); assert_eq!(status_code, 201);
assert_eq!(response["primaryKey"], json!(null)); assert_eq!(response["primaryKey"], json!(null));
let body = json!([{ let body = json!([{
"title": "Test", "title": "Test",
"comment": "comment test" "comment": "comment test"