Rename the ReceivePayload error variant

This commit is contained in:
Kerollmops 2022-12-13 15:07:35 +01:00
parent 526793b5b2
commit 5d5615ef45
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 25 additions and 31 deletions

View File

@ -95,8 +95,8 @@ pub enum PayloadError {
MalformedPayload(serde_json::error::Error), MalformedPayload(serde_json::error::Error),
#[error("A json payload is missing.")] #[error("A json payload is missing.")]
MissingPayload, MissingPayload,
#[error("Error while writing the playload to disk: `{0}`.")] #[error("Error while receiving the playload. `{0}`.")]
ReceivePayloadErr(Box<dyn std::error::Error + Send + Sync + 'static>), ReceivePayload(Box<dyn std::error::Error + Send + Sync + 'static>),
} }
impl ErrorCode for PayloadError { impl ErrorCode for PayloadError {
@ -128,7 +128,7 @@ impl ErrorCode for PayloadError {
}, },
PayloadError::MissingPayload => Code::MissingPayload, PayloadError::MissingPayload => Code::MissingPayload,
PayloadError::MalformedPayload(_) => Code::MalformedPayload, PayloadError::MalformedPayload(_) => Code::MalformedPayload,
PayloadError::ReceivePayloadErr(_) => Code::Internal, PayloadError::ReceivePayload(_) => Code::Internal,
} }
} }
} }

View File

@ -26,7 +26,7 @@ use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
use crate::analytics::{Analytics, DocumentDeletionKind}; use crate::analytics::{Analytics, DocumentDeletionKind};
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
use crate::error::PayloadError::ReceivePayloadErr; use crate::error::PayloadError::ReceivePayload;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::payload::Payload; use crate::extractors::payload::Payload;
@ -232,33 +232,29 @@ async fn document_addition(
let (uuid, mut update_file) = index_scheduler.create_update_file()?; let (uuid, mut update_file) = index_scheduler.create_update_file()?;
let temp_file = match tempfile() { let temp_file = match tempfile() {
Ok(temp_file) => temp_file, Ok(file) => file,
Err(e) => { Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))),
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e))));
}
}; };
let async_file = File::from_std(temp_file); let async_file = File::from_std(temp_file);
let mut buffer = BufWriter::new(async_file); let mut buffer = BufWriter::new(async_file);
let mut buffer_write_size: usize = 0; let mut buffer_write_size: usize = 0;
while let Some(bytes) = body.next().await { while let Some(result) = body.next().await {
let byte = &bytes?; let byte = result?;
if byte.is_empty() && buffer_write_size == 0 { if byte.is_empty() && buffer_write_size == 0 {
return Err(MeilisearchHttpError::MissingPayload(format)); return Err(MeilisearchHttpError::MissingPayload(format));
} }
match buffer.write_all(byte).await { match buffer.write_all(&byte).await {
Ok(()) => buffer_write_size += 1, Ok(()) => buffer_write_size += 1,
Err(e) => { Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))),
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); }
}
};
} }
if let Err(e) = buffer.flush().await { if let Err(e) = buffer.flush().await {
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
} }
if buffer_write_size == 0 { if buffer_write_size == 0 {
@ -266,23 +262,21 @@ async fn document_addition(
} }
if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await { if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await {
return Err(MeilisearchHttpError::Payload(ReceivePayloadErr(Box::new(e)))); return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
}; }
let read_file = buffer.into_inner().into_std().await; let read_file = buffer.into_inner().into_std().await;
let documents_count = tokio::task::spawn_blocking(move || {
let documents_count = let documents_count = match format {
tokio::task::spawn_blocking(move || -> Result<_, MeilisearchHttpError> { PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
let documents_count = match format { PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?,
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?, };
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?, // we NEED to persist the file here because we moved the `udpate_file` in another task.
}; update_file.persist()?;
// we NEED to persist the file here because we moved the `udpate_file` in another task. Ok(documents_count)
update_file.persist()?; })
Ok(documents_count) .await;
})
.await;
let documents_count = match documents_count { let documents_count = match documents_count {
Ok(Ok(documents_count)) => documents_count as u64, Ok(Ok(documents_count)) => documents_count as u64,