change payload send to use stream methods

This commit is contained in:
mpostma 2021-03-22 19:49:21 +01:00
parent b690f1103a
commit 5f33672f0e
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A

View File

@ -82,7 +82,7 @@ impl IndexController {
uid: String, uid: String,
method: milli::update::IndexDocumentsMethod, method: milli::update::IndexDocumentsMethod,
format: milli::update::UpdateFormat, format: milli::update::UpdateFormat,
mut payload: Payload, payload: Payload,
primary_key: Option<String>, primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus> { ) -> anyhow::Result<UpdateStatus> {
let perform_update = |uuid| async move { let perform_update = |uuid| async move {
@ -97,17 +97,16 @@ impl IndexController {
// prevent dead_locking between the update_handle::update that waits for the update to be // prevent dead_locking between the update_handle::update that waits for the update to be
// registered and the update_actor that waits for the the payload to be sent to it. // registered and the update_actor that waits for the the payload to be sent to it.
tokio::task::spawn_local(async move { tokio::task::spawn_local(async move {
while let Some(bytes) = payload.next().await { payload
match bytes { .map(|bytes| {
Ok(bytes) => { bytes.map_err(|e| {
let _ = sender.send(Ok(bytes)).await; Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
} })
Err(e) => { })
let error: Box<dyn std::error::Error + Sync + Send + 'static> = Box::new(e); .for_each(|r| async {
let _ = sender.send(Err(error)).await; let _ = sender.send(r).await;
} })
} .await
}
}); });
// This must be done *AFTER* spawning the task. // This must be done *AFTER* spawning the task.
@ -187,7 +186,11 @@ impl IndexController {
let uuid = self.uuid_resolver.create(uid.clone()).await?; let uuid = self.uuid_resolver.create(uid.clone()).await?;
let meta = self.index_handle.create_index(uuid, primary_key).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?;
let _ = self.update_handle.create(uuid).await?; let _ = self.update_handle.create(uuid).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta }; let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
Ok(meta) Ok(meta)
} }
@ -218,7 +221,11 @@ impl IndexController {
for (uid, uuid) in uuids { for (uid, uuid) in uuids {
let meta = self.index_handle.get_index_meta(uuid).await?; let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta }; let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
ret.push(meta); ret.push(meta);
} }
@ -271,7 +278,11 @@ impl IndexController {
let uuid = self.uuid_resolver.get(uid.clone()).await?; let uuid = self.uuid_resolver.get(uid.clone()).await?;
let meta = self.index_handle.update_index(uuid, index_settings).await?; let meta = self.index_handle.update_index(uuid, index_settings).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta }; let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
Ok(meta) Ok(meta)
} }
@ -284,7 +295,11 @@ impl IndexController {
pub async fn get_index(&self, uid: String) -> anyhow::Result<IndexMetadata> { pub async fn get_index(&self, uid: String) -> anyhow::Result<IndexMetadata> {
let uuid = self.uuid_resolver.get(uid.clone()).await?; let uuid = self.uuid_resolver.get(uid.clone()).await?;
let meta = self.index_handle.get_index_meta(uuid).await?; let meta = self.index_handle.get_index_meta(uuid).await?;
let meta = IndexMetadata { name: uid.clone(), uid, meta }; let meta = IndexMetadata {
name: uid.clone(),
uid,
meta,
};
Ok(meta) Ok(meta)
} }
} }