stream and chunk the data

This commit is contained in:
Tamo 2023-11-29 14:27:50 +01:00 committed by Clément Renault
parent be72326c0a
commit c83a33017e
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 54 additions and 11 deletions

View File

@ -34,6 +34,7 @@ pub type TaskId = u32;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader, Read};
use std::ops::{Bound, RangeBounds}; use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -1279,18 +1280,60 @@ impl IndexScheduler {
/// Once the tasks changes have been commited we must send all the tasks that were updated to our webhook if there is one. /// Once the tasks changes have been commited we must send all the tasks that were updated to our webhook if there is one.
fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> { fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> {
if let Some(ref url) = self.webhook_url { if let Some(ref url) = self.webhook_url {
let rtxn = self.env.read_txn()?; struct TaskReader<'a, 'b> {
rtxn: &'a RoTxn<'a>,
// on average a task takes ~50 bytes index_scheduler: &'a IndexScheduler,
let mut buffer = Vec::with_capacity(updated.len() as usize * 50); tasks: &'b mut roaring::bitmap::Iter<'b>,
buffer: Vec<u8>,
for id in updated { written: usize,
let task = self.get_task(&rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let _ = serde_json::to_writer(&mut buffer, &TaskView::from_task(&task));
buffer.push(b'\n');
} }
let reader = GzEncoder::new(&buffer[..], Compression::default()); impl<'a, 'b> Read for TaskReader<'a, 'b> {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.buffer.is_empty() {
match self.tasks.next() {
None => return Ok(0),
Some(task_id) => {
let task = self
.index_scheduler
.get_task(self.rtxn, task_id)
.map_err(io::Error::other)?
.ok_or_else(|| io::Error::other(Error::CorruptedTaskQueue))?;
serde_json::to_writer(
&mut self.buffer,
&TaskView::from_task(&task),
)?;
self.buffer.push(b'\n');
}
}
}
let mut to_write = &self.buffer[self.written..];
let wrote = io::copy(&mut to_write, &mut buf)?;
self.written += wrote as usize;
// we wrote everything and must refresh our buffer on the next call
if self.written == self.buffer.len() {
self.written = 0;
self.buffer.clear();
}
Ok(wrote as usize)
}
}
let rtxn = self.env.read_txn()?;
let task_reader = TaskReader {
rtxn: &rtxn,
index_scheduler: self,
tasks: &mut updated.into_iter(),
buffer: Vec::with_capacity(50), // on average a task is around ~100 bytes
written: 0,
};
let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
if let Err(e) = ureq::post(url).set("Content-Encoding", "gzip").send(reader) { if let Err(e) = ureq::post(url).set("Content-Encoding", "gzip").send(reader) {
log::error!("While sending data to the webhook: {e}"); log::error!("While sending data to the webhook: {e}");
} }

View File

@ -45,7 +45,7 @@ struct WebhookHandle {
async fn create_webhook_server() -> WebhookHandle { async fn create_webhook_server() -> WebhookHandle {
let mut log_builder = env_logger::Builder::new(); let mut log_builder = env_logger::Builder::new();
log_builder.parse_filters("debug"); log_builder.parse_filters("info");
log_builder.init(); log_builder.init();
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = mpsc::unbounded_channel();