I can index documents without meilisearch

This commit is contained in:
Irevoire 2022-09-21 12:01:46 +02:00 committed by Clément Renault
parent 7bbd75e1fb
commit 16eab7b337
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
13 changed files with 357 additions and 222 deletions

View file

@ -91,11 +91,12 @@ impl IndexScheduler {
let wake_up = SignalEvent::auto(true);
let processing_tasks = (OffsetDateTime::now_utc(), RoaringBitmap::new());
let file_store = FileStore::new(&update_file_path)?;
Ok(Self {
// by default there is no processing tasks
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
file_store: FileStore::new(update_file_path)?,
file_store,
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
status: env.create_database(Some(db_name::STATUS))?,
kind: env.create_database(Some(db_name::KIND))?,
@ -274,166 +275,6 @@ impl IndexScheduler {
Ok(())
}
#[cfg(truc)]
fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> {
match batch {
Batch::One(task) => match &task.kind {
KindWithContent::ClearAllDocuments { index_name } => {
self.index(&index_name)?.clear_documents()?;
}
KindWithContent::RenameIndex {
index_name: _,
new_name,
} => {
if self.available_index.get(wtxn, &new_name)?.unwrap_or(false) {
return Err(Error::IndexAlreadyExists(new_name.to_string()));
}
todo!("wait for @guigui insight");
}
KindWithContent::CreateIndex {
index_name,
primary_key,
} => {
if self
.available_index
.get(wtxn, &index_name)?
.unwrap_or(false)
{
return Err(Error::IndexAlreadyExists(index_name.to_string()));
}
self.available_index.put(wtxn, &index_name, &true)?;
// TODO: TAMO: give real info to the index
let index = Index::open(
index_name.to_string(),
index_name.to_string(),
100_000_000,
Arc::default(),
)?;
if let Some(primary_key) = primary_key {
index.update_primary_key(primary_key.to_string())?;
}
self.index_map
.write()
.map_err(|_| Error::CorruptedTaskQueue)?
.insert(index_name.to_string(), index.clone());
}
KindWithContent::DeleteIndex { index_name } => {
if !self.available_index.delete(wtxn, &index_name)? {
return Err(Error::IndexNotFound(index_name.to_string()));
}
if let Some(index) = self
.index_map
.write()
.map_err(|_| Error::CorruptedTaskQueue)?
.remove(index_name)
{
index.delete()?;
} else {
// TODO: TAMO: fix the path
std::fs::remove_file(index_name)?;
}
}
KindWithContent::SwapIndex { lhs, rhs } => {
if !self.available_index.get(wtxn, &lhs)?.unwrap_or(false) {
return Err(Error::IndexNotFound(lhs.to_string()));
}
if !self.available_index.get(wtxn, &rhs)?.unwrap_or(false) {
return Err(Error::IndexNotFound(rhs.to_string()));
}
let lhs_bitmap = self.index_tasks.get(wtxn, lhs)?;
let rhs_bitmap = self.index_tasks.get(wtxn, rhs)?;
// the bitmap are lazily created and thus may not exists.
if let Some(bitmap) = rhs_bitmap {
self.index_tasks.put(wtxn, lhs, &bitmap)?;
}
if let Some(bitmap) = lhs_bitmap {
self.index_tasks.put(wtxn, rhs, &bitmap)?;
}
let mut index_map = self
.index_map
.write()
.map_err(|_| Error::CorruptedTaskQueue)?;
let lhs_index = index_map.remove(lhs).unwrap();
let rhs_index = index_map.remove(rhs).unwrap();
index_map.insert(lhs.to_string(), rhs_index);
index_map.insert(rhs.to_string(), lhs_index);
}
_ => unreachable!(),
},
Batch::Cancel(_) => todo!(),
Batch::Snapshot(_) => todo!(),
Batch::Dump(_) => todo!(),
Batch::Contiguous { tasks, kind } => {
// it's safe because you can't batch 0 contiguous tasks.
let first_task = &tasks[0];
// and the two kind of tasks we batch MUST have ONE index name.
let index_name = first_task.indexes().unwrap()[0];
let index = self.index(index_name)?;
match kind {
Kind::DocumentAddition => {
let content_files = tasks.iter().map(|task| match &task.kind {
KindWithContent::DocumentAddition { content_file, .. } => {
content_file.clone()
}
k => unreachable!(
"Internal error, `{:?}` is not supposed to be reachable here",
k.as_kind()
),
});
let results = index.update_documents(
IndexDocumentsMethod::UpdateDocuments,
None,
self.file_store.clone(),
content_files,
)?;
for (task, result) in tasks.iter_mut().zip(results) {
task.finished_at = Some(OffsetDateTime::now_utc());
match result {
Ok(_) => task.status = Status::Succeeded,
Err(_) => task.status = Status::Succeeded,
}
}
}
Kind::DocumentDeletion => {
let ids: Vec<_> = tasks
.iter()
.flat_map(|task| match &task.kind {
KindWithContent::DocumentDeletion { documents_ids, .. } => {
documents_ids.clone()
}
k => unreachable!(
"Internal error, `{:?}` is not supposed to be reachable here",
k.as_kind()
),
})
.collect();
let result = index.delete_documents(&ids);
for task in tasks.iter_mut() {
task.finished_at = Some(OffsetDateTime::now_utc());
match result {
Ok(_) => task.status = Status::Succeeded,
Err(_) => task.status = Status::Succeeded,
}
}
}
_ => unreachable!(),
}
}
Batch::Empty => todo!(),
}
Ok(())
}
/// Notify the scheduler there is or may be work to do.
pub fn notify(&self) {
self.wake_up.signal()
@ -443,34 +284,15 @@ impl IndexScheduler {
#[cfg(test)]
mod tests {
use big_s::S;
use insta::assert_debug_snapshot;
use tempfile::TempDir;
use uuid::Uuid;
use crate::assert_smol_debug_snapshot;
use crate::{assert_smol_debug_snapshot, tests::index_scheduler};
use super::*;
fn new() -> IndexScheduler {
let dir = TempDir::new().unwrap();
IndexScheduler::new(
dir.path().join("db_path"),
dir.path().join("file_store"),
dir.path().join("indexes"),
100_000_000,
IndexerConfig::default(),
)
.unwrap()
}
#[test]
fn simple_new() {
new();
}
#[test]
fn register() {
let index_scheduler = new();
let (index_scheduler, _) = index_scheduler();
let kinds = [
KindWithContent::IndexCreation {
index_uid: S("catto"),
@ -541,4 +363,42 @@ mod tests {
assert_smol_debug_snapshot!(index_tasks, @r###"[("catto", RoaringBitmap<[0, 1, 3]>), ("doggo", RoaringBitmap<[4]>)]"###);
}
#[test]
fn document_addition() {
let (index_scheduler, _dir) = index_scheduler();
let content = r#"
{
"id": 1,
"doggo": "bob"
}"#;
let (uuid, mut file) = index_scheduler.file_store.new_update().unwrap();
document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap();
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAddition {
index_uid: S("doggos"),
primary_key: Some(S("id")),
content_file: uuid,
documents_count: 100,
allow_index_creation: true,
})
.unwrap();
index_scheduler.tick().unwrap();
let doggos = index_scheduler.index("doggos").unwrap();
let rtxn = doggos.read_txn().unwrap();
let documents: Vec<_> = doggos
.all_documents(&rtxn)
.unwrap()
.collect::<std::result::Result<_, _>>()
.unwrap();
assert_smol_debug_snapshot!(documents, @r###"[{"id": Number(1), "doggo": String("bob")}]"###);
}
}