implements most operations

This commit is contained in:
Tamo 2022-09-07 22:16:49 +02:00 committed by Clément Renault
parent 5a7fcf2688
commit af0f5d6c0c
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 68 additions and 3 deletions

View File

@ -9,16 +9,19 @@ mod utils;
use batch::Batch; use batch::Batch;
pub use error::Error; pub use error::Error;
use index::Index; use index::Index;
use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str};
pub use task::Task; pub use task::Task;
use task::{Kind, KindWithContent, Status}; use task::{Kind, KindWithContent, Status};
use time::OffsetDateTime;
use update_file_store::UpdateFileStore;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{collections::HashMap, sync::RwLock}; use std::{collections::HashMap, sync::RwLock};
use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str};
use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
use milli::update::IndexDocumentsMethod;
use milli::{RoaringBitmapCodec, BEU32}; use milli::{RoaringBitmapCodec, BEU32};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde::Deserialize; use serde::Deserialize;
@ -54,6 +57,8 @@ pub struct IndexScheduler {
/// The list of tasks currently processing. /// The list of tasks currently processing.
processing_tasks: Arc<RwLock<RoaringBitmap>>, processing_tasks: Arc<RwLock<RoaringBitmap>>,
file_store: UpdateFileStore,
/// The LMDB environment which the DBs are associated with. /// The LMDB environment which the DBs are associated with.
env: Env, env: Env,
@ -326,7 +331,66 @@ impl IndexScheduler {
Batch::Cancel(_) => todo!(), Batch::Cancel(_) => todo!(),
Batch::Snapshot(_) => todo!(), Batch::Snapshot(_) => todo!(),
Batch::Dump(_) => todo!(), Batch::Dump(_) => todo!(),
Batch::Contiguous { tasks, kind } => todo!(), Batch::Contiguous { tasks, kind } => {
// it's safe because you can't batch 0 contiguous tasks.
let first_task = &tasks[0];
// and the two kind of tasks we batch MUST have ONE index name.
let index_name = first_task.indexes().unwrap()[0];
let index = self.index(index_name)?;
match kind {
Kind::DocumentAddition => {
let content_files = tasks.iter().map(|task| match &task.kind {
KindWithContent::DocumentAddition { content_file, .. } => {
content_file.clone()
}
k => unreachable!(
"Internal error, `{:?}` is not supposed to be reachable here",
k.as_kind()
),
});
let results = index.update_documents(
IndexDocumentsMethod::UpdateDocuments,
None,
self.file_store.clone(),
content_files,
)?;
for (task, result) in tasks.iter_mut().zip(results) {
task.finished_at = Some(OffsetDateTime::now_utc());
match result {
Ok(_) => task.status = Status::Succeeded,
Err(_) => task.status = Status::Succeeded,
}
}
}
Kind::DocumentDeletion => {
let ids: Vec<_> = tasks
.iter()
.flat_map(|task| match &task.kind {
KindWithContent::DocumentDeletion { documents_ids, .. } => {
documents_ids.clone()
}
k => unreachable!(
"Internal error, `{:?}` is not supposed to be reachable here",
k.as_kind()
),
})
.collect();
let result = index.delete_documents(&ids);
for task in tasks.iter_mut() {
task.finished_at = Some(OffsetDateTime::now_utc());
match result {
Ok(_) => task.status = Status::Succeeded,
Err(_) => task.status = Status::Succeeded,
}
}
}
_ => unreachable!(),
}
}
Batch::Empty => todo!(), Batch::Empty => todo!(),
} }

View File

@ -2,6 +2,7 @@ use anyhow::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf; use std::path::PathBuf;
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid;
use crate::TaskId; use crate::TaskId;
@ -56,7 +57,7 @@ pub enum KindWithContent {
Snapshot, Snapshot,
DocumentAddition { DocumentAddition {
index_name: String, index_name: String,
content_file: String, content_file: Uuid,
}, },
DocumentDeletion { DocumentDeletion {
index_name: String, index_name: String,