Merge pull request #340 from meilisearch/separate-updates-kvstore

Separate the update and main databases
This commit is contained in:
Clément Renault 2019-11-27 11:39:14 +01:00 committed by GitHub
commit 0cea8ce5b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 498 additions and 414 deletions

8
Cargo.lock generated
View File

@ -636,7 +636,7 @@ dependencies = [
[[package]]
name = "heed"
version = "0.5.0"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bincode 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -944,7 +944,7 @@ dependencies = [
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"fst 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)",
"heed 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"heed 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"levenshtein_automata 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -975,7 +975,7 @@ dependencies = [
"chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"heed 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"heed 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"isahc 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2582,7 +2582,7 @@ dependencies = [
"checksum h2 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462"
"checksum hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
"checksum heed 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b021df76de18f82f716fa6c858fd6bf39aec2c651852055563b5aba51debca81"
"checksum heed 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df6c88807a125a2722484f62fa9c9615d85b0779a06467626db1279c32e287ba"
"checksum hermit-abi 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "307c3c9f937f38e3534b1d6447ecf090cafcc9744e4a6360e8b037b2cf5af120"
"checksum http 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "d7e06e336150b178206af098a055e3621e8336027e2b4d126bda0bc64824baaf"
"checksum http-body 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6741c859c1b2463a423a1dbce98d418e6c3c3fc720fb0d45528657320920292d"

View File

@ -14,7 +14,7 @@ deunicode = "1.0.0"
env_logger = "0.7.0"
fst = { version = "0.3.5", default-features = false }
hashbrown = { version = "0.6.0", features = ["serde"] }
heed = "0.5.0"
heed = "0.6.0"
levenshtein_automata = { version = "0.1.1", features = ["fst_automaton"] }
log = "0.4.8"
meilisearch-schema = { path = "../meilisearch-schema", version = "0.8.0" }

View File

@ -113,24 +113,25 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
database.set_update_callback(Box::new(update_fn));
let env = &database.env;
let db = &database;
let schema = {
let string = fs::read_to_string(&command.schema)?;
toml::from_str(&string).unwrap()
};
let mut writer = env.write_txn().unwrap();
match index.main.schema(&writer)? {
let reader = db.main_read_txn().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
match index.main.schema(&reader)? {
Some(current_schema) => {
if current_schema != schema {
return Err(meilisearch_core::Error::SchemaDiffer.into());
}
writer.abort();
update_writer.abort();
}
None => {
index.schema_update(&mut writer, schema)?;
writer.commit().unwrap();
index.schema_update(&mut update_writer, schema)?;
update_writer.commit().unwrap();
}
}
@ -173,10 +174,10 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
println!();
let mut writer = env.write_txn().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
println!("committing update...");
let update_id = additions.finalize(&mut writer)?;
writer.commit().unwrap();
let update_id = additions.finalize(&mut update_writer)?;
update_writer.commit().unwrap();
max_update_id = max_update_id.max(update_id);
println!("committed update {}", update_id);
}
@ -316,12 +317,12 @@ fn crop_text(
}
fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<dyn Error>> {
let env = &database.env;
let db = &database;
let index = database
.open_index(&command.index_uid)
.expect("Could not find index");
let reader = env.read_txn().unwrap();
let reader = db.main_read_txn().unwrap();
let schema = index.main.schema(&reader)?;
reader.abort();
@ -339,7 +340,7 @@ fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<
Ok(query) => {
let start_total = Instant::now();
let reader = env.read_txn().unwrap();
let reader = db.main_read_txn().unwrap();
let ref_index = &index;
let ref_reader = &reader;
@ -444,12 +445,12 @@ fn show_updates_command(
command: ShowUpdatesCommand,
database: Database,
) -> Result<(), Box<dyn Error>> {
let env = &database.env;
let db = &database;
let index = database
.open_index(&command.index_uid)
.expect("Could not find index");
let reader = env.read_txn().unwrap();
let reader = db.update_read_txn().unwrap();
let updates = index.all_updates_status(&reader)?;
println!("{:#?}", updates);
reader.abort();

View File

@ -8,6 +8,7 @@ use fst::{IntoStreamer, Streamer};
use levenshtein_automata::DFA;
use meilisearch_tokenizer::{is_cjk, split_query_string};
use crate::database::MainT;
use crate::error::MResult;
use crate::store;
@ -23,7 +24,7 @@ pub struct AutomatonProducer {
impl AutomatonProducer {
pub fn new(
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
query: &str,
main_store: store::Main,
postings_list_store: store::PostingsLists,
@ -131,7 +132,7 @@ pub fn normalize_str(string: &str) -> String {
}
fn split_best_frequency<'a>(
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
word: &'a str,
postings_lists_store: store::PostingsLists,
) -> MResult<Option<(&'a str, &'a str)>> {
@ -159,7 +160,7 @@ fn split_best_frequency<'a>(
}
fn generate_automatons(
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
query: &str,
main_store: store::Main,
postings_lists_store: store::PostingsLists,

View File

@ -14,8 +14,12 @@ use crate::{store, update, Index, MResult};
pub type BoxUpdateFn = Box<dyn Fn(&str, update::ProcessedUpdateResult) + Send + Sync + 'static>;
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
pub struct MainT;
pub struct UpdateT;
pub struct Database {
pub env: heed::Env,
env: heed::Env,
update_env: heed::Env,
common_store: heed::PolyDatabase,
indexes_store: heed::Database<Str, Unit>,
indexes: RwLock<HashMap<String, (Index, thread::JoinHandle<MResult<()>>)>>,
@ -45,6 +49,7 @@ pub type UpdateEventsEmitter = Sender<UpdateEvent>;
fn update_awaiter(
receiver: UpdateEvents,
env: heed::Env,
update_env: heed::Env,
index_uid: &str,
update_fn: Arc<ArcSwapFn>,
index: Index,
@ -52,42 +57,54 @@ fn update_awaiter(
let mut receiver = receiver.into_iter();
while let Some(UpdateEvent::NewUpdate) = receiver.next() {
loop {
// instantiate a main/parent transaction
let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed");
// We instantiate a *write* transaction to *block* the thread
// until the *other*, notifiying, thread commits
let result = update_env.typed_write_txn::<UpdateT>();
let update_reader = break_try!(result, "LMDB read transaction (update) begin failed");
// retrieve the update that needs to be processed
let result = index.updates.pop_front(&mut writer);
let result = index.updates.first_update(&update_reader);
let (update_id, update) = match break_try!(result, "pop front update failed") {
Some(value) => value,
None => {
debug!("no more updates");
writer.abort();
break;
}
};
// instantiate a nested transaction
let result = env.nested_write_txn(&mut writer);
let mut nested_writer = break_try!(result, "LMDB nested write transaction failed");
// do not keep the reader for too long
update_reader.abort();
// try to apply the update to the database using the nested transaction
let result = update::update_task(&mut nested_writer, index.clone(), update_id, update);
// instantiate a transaction to touch to the main env
let result = env.typed_write_txn::<MainT>();
let mut main_writer = break_try!(result, "LMDB nested write transaction failed");
// try to apply the update to the database using the main transaction
let result = update::update_task(&mut main_writer, &index, update_id, update);
let status = break_try!(result, "update task failed");
// commit the nested transaction if the update was successful, abort it otherwise
// commit the main transaction if the update was successful, abort it otherwise
if status.error.is_none() {
break_try!(nested_writer.commit(), "commit nested transaction failed");
break_try!(main_writer.commit(), "commit nested transaction failed");
} else {
nested_writer.abort()
main_writer.abort()
}
// write the result of the update in the updates-results store
let updates_results = index.updates_results;
let result = updates_results.put_update_result(&mut writer, update_id, &status);
// now that the update has been processed we can instantiate
// a transaction to move the result to the updates-results store
let result = update_env.typed_write_txn::<UpdateT>();
let mut update_writer = break_try!(result, "LMDB write transaction begin failed");
// always commit the main/parent transaction, even if the update was unsuccessful
// definitely remove the update from the updates store
index.updates.del_update(&mut update_writer, update_id)?;
// write the result of the updates-results store
let updates_results = index.updates_results;
let result = updates_results.put_update_result(&mut update_writer, update_id, &status);
// always commit the main transaction, even if the update was unsuccessful
break_try!(result, "update result store commit failed");
break_try!(writer.commit(), "update parent transaction failed");
break_try!(update_writer.commit(), "update transaction commit failed");
// call the user callback when the update and the result are written consistently
if let Some(ref callback) = *update_fn.load() {
@ -98,9 +115,11 @@ fn update_awaiter(
debug!("update loop system stopped");
let mut writer = env.write_txn()?;
store::clear(&mut writer, &index)?;
let mut writer = env.typed_write_txn::<MainT>()?;
let mut update_writer = update_env.typed_write_txn::<UpdateT>()?;
store::clear(&mut writer, &mut update_writer, &index)?;
writer.commit()?;
update_writer.commit()?;
debug!("store {} cleared", index_uid);
@ -109,12 +128,20 @@ fn update_awaiter(
impl Database {
pub fn open_or_create(path: impl AsRef<Path>) -> MResult<Database> {
fs::create_dir_all(path.as_ref())?;
let main_path = path.as_ref().join("main");
let update_path = path.as_ref().join("update");
fs::create_dir_all(&main_path)?;
let env = heed::EnvOpenOptions::new()
.map_size(10 * 1024 * 1024 * 1024) // 10GB
.max_dbs(3000)
.open(path)?;
.open(main_path)?;
fs::create_dir_all(&update_path)?;
let update_env = heed::EnvOpenOptions::new()
.map_size(10 * 1024 * 1024 * 1024) // 10GB
.max_dbs(3000)
.open(update_path)?;
let common_store = env.create_poly_database(Some("common"))?;
let indexes_store = env.create_database::<Str, Unit>(Some("indexes"))?;
@ -134,7 +161,7 @@ impl Database {
let mut indexes = HashMap::new();
for index_uid in must_open {
let (sender, receiver) = crossbeam_channel::bounded(100);
let index = match store::open(&env, &index_uid, sender.clone())? {
let index = match store::open(&env, &update_env, &index_uid, sender.clone())? {
Some(index) => index,
None => {
log::warn!(
@ -146,6 +173,7 @@ impl Database {
};
let env_clone = env.clone();
let update_env_clone = update_env.clone();
let index_clone = index.clone();
let name_clone = index_uid.clone();
let update_fn_clone = update_fn.clone();
@ -154,6 +182,7 @@ impl Database {
update_awaiter(
receiver,
env_clone,
update_env_clone,
&name_clone,
update_fn_clone,
index_clone,
@ -173,6 +202,7 @@ impl Database {
Ok(Database {
env,
update_env,
common_store,
indexes_store,
indexes: RwLock::new(indexes),
@ -196,12 +226,13 @@ impl Database {
Entry::Occupied(_) => Err(crate::Error::IndexAlreadyExists),
Entry::Vacant(entry) => {
let (sender, receiver) = crossbeam_channel::bounded(100);
let index = store::create(&self.env, name, sender)?;
let index = store::create(&self.env, &self.update_env, name, sender)?;
let mut writer = self.env.write_txn()?;
self.indexes_store.put(&mut writer, name, &())?;
let env_clone = self.env.clone();
let update_env_clone = self.update_env.clone();
let index_clone = index.clone();
let name_clone = name.to_owned();
let update_fn_clone = self.update_fn.clone();
@ -210,6 +241,7 @@ impl Database {
update_awaiter(
receiver,
env_clone,
update_env_clone,
&name_clone,
update_fn_clone,
index_clone,
@ -259,6 +291,22 @@ impl Database {
self.update_fn.swap(None);
}
pub fn main_read_txn(&self) -> heed::Result<heed::RoTxn<MainT>> {
self.env.typed_read_txn::<MainT>()
}
pub fn main_write_txn(&self) -> heed::Result<heed::RwTxn<MainT>> {
self.env.typed_write_txn::<MainT>()
}
pub fn update_read_txn(&self) -> heed::Result<heed::RoTxn<UpdateT>> {
self.update_env.typed_read_txn::<UpdateT>()
}
pub fn update_write_txn(&self) -> heed::Result<heed::RwTxn<UpdateT>> {
self.update_env.typed_write_txn::<UpdateT>()
}
pub fn copy_and_compact_to_path<P: AsRef<Path>>(&self, path: P) -> ZResult<File> {
self.env.copy_to_path(path, CompactionOption::Enabled)
}
@ -288,7 +336,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -313,9 +361,9 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@ -334,15 +382,15 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut update_writer).unwrap();
update_writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
}
@ -351,7 +399,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -376,9 +424,9 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@ -396,15 +444,15 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut update_writer).unwrap();
update_writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some());
}
@ -413,7 +461,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -434,9 +482,9 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@ -447,15 +495,15 @@ mod tests {
additions.update_document(doc1);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut update_writer).unwrap();
update_writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
}
@ -464,7 +512,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -489,9 +537,9 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@ -510,9 +558,9 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let _update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = additions.finalize(&mut update_writer).unwrap();
update_writer.commit().unwrap();
let schema = {
let data = r#"
@ -537,7 +585,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -545,10 +593,10 @@ mod tests {
let _ = receiver.iter().find(|id| *id == update_id);
// check if it has been accepted
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
reader.abort();
update_reader.abort();
let mut additions = index.documents_addition();
@ -571,7 +619,7 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
@ -579,11 +627,13 @@ mod tests {
let _ = receiver.iter().find(|id| *id == update_id);
// check if it has been accepted
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
update_reader.abort();
// even try to search for a document
let reader = db.main_read_txn().unwrap();
let results = index.query_builder().query(&reader, "21 ", 0..20).unwrap();
assert_matches!(results.len(), 1);
@ -617,7 +667,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -625,8 +675,8 @@ mod tests {
let _ = receiver.iter().find(|id| *id == update_id);
// check if it has been accepted
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some());
}
@ -635,7 +685,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -660,7 +710,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -683,17 +733,19 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
update_reader.abort();
let reader = db.main_read_txn().unwrap();
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
assert!(document.is_none());
@ -713,7 +765,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -741,7 +793,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -764,17 +816,19 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
update_reader.abort();
let reader = db.main_read_txn().unwrap();
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
assert!(document.is_none());
@ -807,17 +861,19 @@ mod tests {
partial_additions.update_document(partial_doc1);
partial_additions.update_document(partial_doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = partial_additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
update_reader.abort();
let reader = db.main_read_txn().unwrap();
let document: Option<serde_json::Value> = index
.document(&reader, None, DocumentId(7900334843754999545))
.unwrap();
@ -846,7 +902,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Arc::new(Database::open_or_create(dir.path()).unwrap());
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let db_cloned = database.clone();
@ -877,7 +933,7 @@ mod tests {
};
// add a schema to the index
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -899,7 +955,7 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
@ -919,7 +975,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -944,7 +1000,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -967,15 +1023,14 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let reader = db.main_read_txn().unwrap();
let schema = index.main.schema(&reader).unwrap().unwrap();
let ranked_map = index.main.ranked_map(&reader).unwrap().unwrap();

View File

@ -18,7 +18,7 @@ pub mod serde;
pub mod store;
mod update;
pub use self::database::{BoxUpdateFn, Database};
pub use self::database::{BoxUpdateFn, Database, MainT, UpdateT};
pub use self::error::{Error, MResult};
pub use self::number::{Number, ParseNumberError};
pub use self::ranked_map::RankedMap;

View File

@ -9,6 +9,7 @@ use fst::{IntoStreamer, Streamer};
use sdset::SetBuf;
use slice_group_by::{GroupBy, GroupByMut};
use crate::database::MainT;
use crate::automaton::{Automaton, AutomatonGroup, AutomatonProducer, QueryEnhancer};
use crate::distinct_map::{BufferedDistinctMap, DistinctMap};
use crate::levenshtein::prefix_damerau_levenshtein;
@ -139,7 +140,7 @@ fn multiword_rewrite_matches(
}
fn fetch_raw_documents(
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
automatons_groups: &[AutomatonGroup],
query_enhancer: &QueryEnhancer,
searchables: Option<&ReorderedAttrs>,
@ -336,7 +337,7 @@ impl<'c, 'f, 'd> QueryBuilder<'c, 'f, 'd> {
pub fn query(
self,
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
query: &str,
range: Range<usize>,
) -> MResult<Vec<Document>> {
@ -374,7 +375,7 @@ impl<'c, 'f, 'd> QueryBuilder<'c, 'f, 'd> {
}
fn raw_query<'c, FI>(
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
query: &str,
range: Range<usize>,
@ -510,7 +511,7 @@ where
}
fn raw_query_with_distinct<'c, FI, FD>(
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
query: &str,
range: Range<usize>,
@ -765,8 +766,8 @@ mod tests {
}
pub fn add_synonym(&mut self, word: &str, new: SetBuf<&str>) {
let env = &self.database.env;
let mut writer = env.write_txn().unwrap();
let db = &self.database;
let mut writer = db.main_write_txn().unwrap();
let word = word.to_lowercase();
@ -809,8 +810,8 @@ mod tests {
let database = Database::open_or_create(&tempdir).unwrap();
let index = database.create_index("default").unwrap();
let env = &database.env;
let mut writer = env.write_txn().unwrap();
let db = &database;
let mut writer = db.main_write_txn().unwrap();
let mut words_fst = BTreeSet::new();
let mut postings_lists = HashMap::new();
@ -872,8 +873,8 @@ mod tests {
("apple", &[doc_char_index(0, 2, 2)][..]),
]);
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "iphone from apple", 0..20).unwrap();
@ -895,8 +896,8 @@ mod tests {
store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "hello", 0..20).unwrap();
@ -928,8 +929,8 @@ mod tests {
store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello"]));
store.add_synonym("salut", SetBuf::from_dirty(vec!["hello"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "sal", 0..20).unwrap();
@ -972,8 +973,8 @@ mod tests {
store.add_synonym("salutation", SetBuf::from_dirty(vec!["hello"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "salutution", 0..20).unwrap();
@ -1010,8 +1011,8 @@ mod tests {
store.add_synonym("bonjour", SetBuf::from_dirty(vec!["hello", "salut"]));
store.add_synonym("salut", SetBuf::from_dirty(vec!["hello", "bonjour"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "hello", 0..20).unwrap();
@ -1098,8 +1099,8 @@ mod tests {
SetBuf::from_dirty(vec!["NY", "new york", "new york city"]),
);
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "NY subway", 0..20).unwrap();
@ -1168,8 +1169,8 @@ mod tests {
store.add_synonym("NY", SetBuf::from_dirty(vec!["york new"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "NY", 0..20).unwrap();
@ -1226,8 +1227,8 @@ mod tests {
store.add_synonym("new york", SetBuf::from_dirty(vec!["NY"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "NY subway", 0..20).unwrap();
@ -1291,8 +1292,8 @@ mod tests {
SetBuf::from_dirty(vec!["NY", "new york", "new york city"]),
);
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "NY subway", 0..20).unwrap();
@ -1372,8 +1373,8 @@ mod tests {
);
store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "NY subway broken", 0..20).unwrap();
@ -1459,8 +1460,8 @@ mod tests {
);
store.add_synonym("underground train", SetBuf::from_dirty(vec!["subway"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder
@ -1559,8 +1560,8 @@ mod tests {
store.add_synonym("new york", SetBuf::from_dirty(vec!["new york city"]));
store.add_synonym("new york city", SetBuf::from_dirty(vec!["new york"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "new york big ", 0..20).unwrap();
@ -1596,8 +1597,8 @@ mod tests {
store.add_synonym("NY", SetBuf::from_dirty(vec!["new york city story"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "NY subway ", 0..20).unwrap();
@ -1646,8 +1647,8 @@ mod tests {
store.add_synonym("new york city", SetBuf::from_dirty(vec!["NYC"]));
store.add_synonym("subway", SetBuf::from_dirty(vec!["underground train"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder
@ -1679,8 +1680,8 @@ mod tests {
store.add_synonym("téléphone", SetBuf::from_dirty(vec!["iphone"]));
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "telephone", 0..20).unwrap();
@ -1741,8 +1742,8 @@ mod tests {
("case", &[doc_index(0, 1)][..]),
]);
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "i phone case", 0..20).unwrap();
@ -1769,8 +1770,8 @@ mod tests {
("engine", &[doc_index(1, 2)][..]),
]);
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "searchengine", 0..20).unwrap();
@ -1801,8 +1802,8 @@ mod tests {
("engine", &[doc_index(1, 3)][..]),
]);
let env = &store.database.env;
let reader = env.read_txn().unwrap();
let db = &store.database;
let reader = db.main_read_txn().unwrap();
let builder = store.query_builder();
let results = builder.query(&reader, "searchengine", 0..20).unwrap();

View File

@ -8,6 +8,7 @@ use serde_json::de::IoRead as SerdeJsonIoRead;
use serde_json::Deserializer as SerdeJsonDeserializer;
use serde_json::Error as SerdeJsonError;
use crate::database::MainT;
use crate::store::DocumentsFields;
use crate::DocumentId;
@ -50,7 +51,7 @@ impl From<heed::Error> for DeserializerError {
pub struct Deserializer<'a> {
pub document_id: DocumentId,
pub reader: &'a heed::RoTxn,
pub reader: &'a heed::RoTxn<MainT>,
pub documents_fields: DocumentsFields,
pub schema: &'a Schema,
pub attributes: Option<&'a HashSet<SchemaAttr>>,

View File

@ -1,6 +1,7 @@
use meilisearch_schema::{Schema, SchemaAttr, SchemaProps};
use serde::ser;
use crate::database::MainT;
use crate::raw_indexer::RawIndexer;
use crate::store::{DocumentsFields, DocumentsFieldsCounts};
use crate::{DocumentId, RankedMap};
@ -8,7 +9,7 @@ use crate::{DocumentId, RankedMap};
use super::{ConvertToNumber, ConvertToString, Indexer, SerializerError};
pub struct Serializer<'a, 'b> {
pub txn: &'a mut heed::RwTxn<'b>,
pub txn: &'a mut heed::RwTxn<'b, MainT>,
pub schema: &'a Schema,
pub document_store: DocumentsFields,
pub document_fields_counts: DocumentsFieldsCounts,
@ -191,7 +192,7 @@ impl<'a, 'b> ser::Serializer for Serializer<'a, 'b> {
}
pub struct MapSerializer<'a, 'b> {
txn: &'a mut heed::RwTxn<'b>,
txn: &'a mut heed::RwTxn<'b, MainT>,
schema: &'a Schema,
document_id: DocumentId,
document_store: DocumentsFields,
@ -254,7 +255,7 @@ impl<'a, 'b> ser::SerializeMap for MapSerializer<'a, 'b> {
}
pub struct StructSerializer<'a, 'b> {
txn: &'a mut heed::RwTxn<'b>,
txn: &'a mut heed::RwTxn<'b, MainT>,
schema: &'a Schema,
document_id: DocumentId,
document_store: DocumentsFields,
@ -297,7 +298,7 @@ impl<'a, 'b> ser::SerializeStruct for StructSerializer<'a, 'b> {
}
pub fn serialize_value<T: ?Sized>(
txn: &mut heed::RwTxn,
txn: &mut heed::RwTxn<MainT>,
attribute: SchemaAttr,
props: SchemaProps,
document_id: DocumentId,

View File

@ -1,4 +1,5 @@
use super::BEU64;
use crate::database::MainT;
use crate::DocumentId;
use heed::types::{ByteSlice, OwnedType};
use heed::Result as ZResult;
@ -12,7 +13,7 @@ pub struct DocsWords {
impl DocsWords {
pub fn put_doc_words(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
document_id: DocumentId,
words: &fst::Set,
) -> ZResult<()> {
@ -21,18 +22,18 @@ impl DocsWords {
self.docs_words.put(writer, &document_id, bytes)
}
pub fn del_doc_words(self, writer: &mut heed::RwTxn, document_id: DocumentId) -> ZResult<bool> {
pub fn del_doc_words(self, writer: &mut heed::RwTxn<MainT>, document_id: DocumentId) -> ZResult<bool> {
let document_id = BEU64::new(document_id.0);
self.docs_words.delete(writer, &document_id)
}
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn clear(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<()> {
self.docs_words.clear(writer)
}
pub fn doc_words(
self,
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
document_id: DocumentId,
) -> ZResult<Option<fst::Set>> {
let document_id = BEU64::new(document_id.0);

View File

@ -1,4 +1,5 @@
use heed::types::{ByteSlice, OwnedType};
use crate::database::MainT;
use heed::Result as ZResult;
use meilisearch_schema::SchemaAttr;
@ -13,7 +14,7 @@ pub struct DocumentsFields {
impl DocumentsFields {
pub fn put_document_field(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
document_id: DocumentId,
attribute: SchemaAttr,
value: &[u8],
@ -24,7 +25,7 @@ impl DocumentsFields {
pub fn del_all_document_fields(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
document_id: DocumentId,
) -> ZResult<usize> {
let start = DocumentAttrKey::new(document_id, SchemaAttr::min());
@ -32,13 +33,13 @@ impl DocumentsFields {
self.documents_fields.delete_range(writer, &(start..=end))
}
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn clear(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<()> {
self.documents_fields.clear(writer)
}
pub fn document_attribute<'txn>(
self,
reader: &'txn heed::RoTxn,
reader: &'txn heed::RoTxn<MainT>,
document_id: DocumentId,
attribute: SchemaAttr,
) -> ZResult<Option<&'txn [u8]>> {
@ -48,7 +49,7 @@ impl DocumentsFields {
pub fn document_fields<'txn>(
self,
reader: &'txn heed::RoTxn,
reader: &'txn heed::RoTxn<MainT>,
document_id: DocumentId,
) -> ZResult<DocumentFieldsIter<'txn>> {
let start = DocumentAttrKey::new(document_id, SchemaAttr::min());

View File

@ -1,4 +1,5 @@
use super::DocumentAttrKey;
use crate::database::MainT;
use crate::DocumentId;
use heed::types::OwnedType;
use heed::Result as ZResult;
@ -12,7 +13,7 @@ pub struct DocumentsFieldsCounts {
impl DocumentsFieldsCounts {
pub fn put_document_field_count(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
document_id: DocumentId,
attribute: SchemaAttr,
value: u64,
@ -23,7 +24,7 @@ impl DocumentsFieldsCounts {
pub fn del_all_document_fields_counts(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
document_id: DocumentId,
) -> ZResult<usize> {
let start = DocumentAttrKey::new(document_id, SchemaAttr::min());
@ -32,13 +33,13 @@ impl DocumentsFieldsCounts {
.delete_range(writer, &(start..=end))
}
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn clear(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<()> {
self.documents_fields_counts.clear(writer)
}
pub fn document_field_count(
self,
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
document_id: DocumentId,
attribute: SchemaAttr,
) -> ZResult<Option<u64>> {
@ -51,7 +52,7 @@ impl DocumentsFieldsCounts {
pub fn document_fields_counts<'txn>(
self,
reader: &'txn heed::RoTxn,
reader: &'txn heed::RoTxn<MainT>,
document_id: DocumentId,
) -> ZResult<DocumentFieldsCountsIter<'txn>> {
let start = DocumentAttrKey::new(document_id, SchemaAttr::min());
@ -60,7 +61,7 @@ impl DocumentsFieldsCounts {
Ok(DocumentFieldsCountsIter { iter })
}
pub fn documents_ids<'txn>(self, reader: &'txn heed::RoTxn) -> ZResult<DocumentsIdsIter<'txn>> {
pub fn documents_ids<'txn>(self, reader: &'txn heed::RoTxn<MainT>) -> ZResult<DocumentsIdsIter<'txn>> {
let iter = self.documents_fields_counts.iter(reader)?;
Ok(DocumentsIdsIter {
last_seen_id: None,
@ -70,7 +71,7 @@ impl DocumentsFieldsCounts {
pub fn all_documents_fields_counts<'txn>(
self,
reader: &'txn heed::RoTxn,
reader: &'txn heed::RoTxn<MainT>,
) -> ZResult<AllDocumentsFieldsCountsIter<'txn>> {
let iter = self.documents_fields_counts.iter(reader)?;
Ok(AllDocumentsFieldsCountsIter { iter })

View File

@ -1,3 +1,4 @@
use crate::database::MainT;
use crate::RankedMap;
use chrono::{DateTime, Utc};
use heed::types::{ByteSlice, OwnedType, SerdeBincode, Str};
@ -28,46 +29,46 @@ pub struct Main {
}
impl Main {
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn clear(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<()> {
self.main.clear(writer)
}
pub fn put_name(self, writer: &mut heed::RwTxn, name: &str) -> ZResult<()> {
self.main.put::<Str, Str>(writer, NAME_KEY, name)
pub fn put_name(self, writer: &mut heed::RwTxn<MainT>, name: &str) -> ZResult<()> {
self.main.put::<_, Str, Str>(writer, NAME_KEY, name)
}
pub fn name(self, reader: &heed::RoTxn) -> ZResult<Option<String>> {
pub fn name(self, reader: &heed::RoTxn<MainT>) -> ZResult<Option<String>> {
Ok(self
.main
.get::<Str, Str>(reader, NAME_KEY)?
.get::<_, Str, Str>(reader, NAME_KEY)?
.map(|name| name.to_owned()))
}
pub fn put_created_at(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn put_created_at(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<()> {
self.main
.put::<Str, SerdeDatetime>(writer, CREATED_AT_KEY, &Utc::now())
.put::<_, Str, SerdeDatetime>(writer, CREATED_AT_KEY, &Utc::now())
}
pub fn created_at(self, reader: &heed::RoTxn) -> ZResult<Option<DateTime<Utc>>> {
self.main.get::<Str, SerdeDatetime>(reader, CREATED_AT_KEY)
pub fn created_at(self, reader: &heed::RoTxn<MainT>) -> ZResult<Option<DateTime<Utc>>> {
self.main.get::<_, Str, SerdeDatetime>(reader, CREATED_AT_KEY)
}
pub fn put_updated_at(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn put_updated_at(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<()> {
self.main
.put::<Str, SerdeDatetime>(writer, UPDATED_AT_KEY, &Utc::now())
.put::<_, Str, SerdeDatetime>(writer, UPDATED_AT_KEY, &Utc::now())
}
pub fn updated_at(self, reader: &heed::RoTxn) -> ZResult<Option<DateTime<Utc>>> {
self.main.get::<Str, SerdeDatetime>(reader, UPDATED_AT_KEY)
pub fn updated_at(self, reader: &heed::RoTxn<MainT>) -> ZResult<Option<DateTime<Utc>>> {
self.main.get::<_, Str, SerdeDatetime>(reader, UPDATED_AT_KEY)
}
pub fn put_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> {
pub fn put_words_fst(self, writer: &mut heed::RwTxn<MainT>, fst: &fst::Set) -> ZResult<()> {
let bytes = fst.as_fst().as_bytes();
self.main.put::<Str, ByteSlice>(writer, WORDS_KEY, bytes)
self.main.put::<_, Str, ByteSlice>(writer, WORDS_KEY, bytes)
}
pub fn words_fst(self, reader: &heed::RoTxn) -> ZResult<Option<fst::Set>> {
match self.main.get::<Str, ByteSlice>(reader, WORDS_KEY)? {
pub fn words_fst(self, reader: &heed::RoTxn<MainT>) -> ZResult<Option<fst::Set>> {
match self.main.get::<_, Str, ByteSlice>(reader, WORDS_KEY)? {
Some(bytes) => {
let len = bytes.len();
let bytes = Arc::new(bytes.to_owned());
@ -78,33 +79,33 @@ impl Main {
}
}
pub fn put_schema(self, writer: &mut heed::RwTxn, schema: &Schema) -> ZResult<()> {
pub fn put_schema(self, writer: &mut heed::RwTxn<MainT>, schema: &Schema) -> ZResult<()> {
self.main
.put::<Str, SerdeBincode<Schema>>(writer, SCHEMA_KEY, schema)
.put::<_, Str, SerdeBincode<Schema>>(writer, SCHEMA_KEY, schema)
}
pub fn schema(self, reader: &heed::RoTxn) -> ZResult<Option<Schema>> {
pub fn schema(self, reader: &heed::RoTxn<MainT>) -> ZResult<Option<Schema>> {
self.main
.get::<Str, SerdeBincode<Schema>>(reader, SCHEMA_KEY)
.get::<_, Str, SerdeBincode<Schema>>(reader, SCHEMA_KEY)
}
pub fn put_ranked_map(self, writer: &mut heed::RwTxn, ranked_map: &RankedMap) -> ZResult<()> {
pub fn put_ranked_map(self, writer: &mut heed::RwTxn<MainT>, ranked_map: &RankedMap) -> ZResult<()> {
self.main
.put::<Str, SerdeBincode<RankedMap>>(writer, RANKED_MAP_KEY, &ranked_map)
.put::<_, Str, SerdeBincode<RankedMap>>(writer, RANKED_MAP_KEY, &ranked_map)
}
pub fn ranked_map(self, reader: &heed::RoTxn) -> ZResult<Option<RankedMap>> {
pub fn ranked_map(self, reader: &heed::RoTxn<MainT>) -> ZResult<Option<RankedMap>> {
self.main
.get::<Str, SerdeBincode<RankedMap>>(reader, RANKED_MAP_KEY)
.get::<_, Str, SerdeBincode<RankedMap>>(reader, RANKED_MAP_KEY)
}
pub fn put_synonyms_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> {
pub fn put_synonyms_fst(self, writer: &mut heed::RwTxn<MainT>, fst: &fst::Set) -> ZResult<()> {
let bytes = fst.as_fst().as_bytes();
self.main.put::<Str, ByteSlice>(writer, SYNONYMS_KEY, bytes)
self.main.put::<_, Str, ByteSlice>(writer, SYNONYMS_KEY, bytes)
}
pub fn synonyms_fst(self, reader: &heed::RoTxn) -> ZResult<Option<fst::Set>> {
match self.main.get::<Str, ByteSlice>(reader, SYNONYMS_KEY)? {
pub fn synonyms_fst(self, reader: &heed::RoTxn<MainT>) -> ZResult<Option<fst::Set>> {
match self.main.get::<_, Str, ByteSlice>(reader, SYNONYMS_KEY)? {
Some(bytes) => {
let len = bytes.len();
let bytes = Arc::new(bytes.to_owned());
@ -115,14 +116,14 @@ impl Main {
}
}
pub fn put_stop_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> {
pub fn put_stop_words_fst(self, writer: &mut heed::RwTxn<MainT>, fst: &fst::Set) -> ZResult<()> {
let bytes = fst.as_fst().as_bytes();
self.main
.put::<Str, ByteSlice>(writer, STOP_WORDS_KEY, bytes)
.put::<_, Str, ByteSlice>(writer, STOP_WORDS_KEY, bytes)
}
pub fn stop_words_fst(self, reader: &heed::RoTxn) -> ZResult<Option<fst::Set>> {
match self.main.get::<Str, ByteSlice>(reader, STOP_WORDS_KEY)? {
pub fn stop_words_fst(self, reader: &heed::RoTxn<MainT>) -> ZResult<Option<fst::Set>> {
match self.main.get::<_, Str, ByteSlice>(reader, STOP_WORDS_KEY)? {
Some(bytes) => {
let len = bytes.len();
let bytes = Arc::new(bytes.to_owned());
@ -133,20 +134,20 @@ impl Main {
}
}
pub fn put_number_of_documents<F>(self, writer: &mut heed::RwTxn, f: F) -> ZResult<u64>
pub fn put_number_of_documents<F>(self, writer: &mut heed::RwTxn<MainT>, f: F) -> ZResult<u64>
where
F: Fn(u64) -> u64,
{
let new = self.number_of_documents(writer).map(f)?;
let new = self.number_of_documents(&*writer).map(f)?;
self.main
.put::<Str, OwnedType<u64>>(writer, NUMBER_OF_DOCUMENTS_KEY, &new)?;
.put::<_, Str, OwnedType<u64>>(writer, NUMBER_OF_DOCUMENTS_KEY, &new)?;
Ok(new)
}
pub fn number_of_documents(self, reader: &heed::RoTxn) -> ZResult<u64> {
pub fn number_of_documents(self, reader: &heed::RoTxn<MainT>) -> ZResult<u64> {
match self
.main
.get::<Str, OwnedType<u64>>(reader, NUMBER_OF_DOCUMENTS_KEY)?
.get::<_, Str, OwnedType<u64>>(reader, NUMBER_OF_DOCUMENTS_KEY)?
{
Some(value) => Ok(value),
None => Ok(0),
@ -155,29 +156,29 @@ impl Main {
pub fn put_fields_frequency(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
fields_frequency: &FreqsMap,
) -> ZResult<()> {
self.main
.put::<Str, SerdeFreqsMap>(writer, FIELDS_FREQUENCY_KEY, fields_frequency)
.put::<_, Str, SerdeFreqsMap>(writer, FIELDS_FREQUENCY_KEY, fields_frequency)
}
pub fn fields_frequency(&self, reader: &heed::RoTxn) -> ZResult<Option<FreqsMap>> {
pub fn fields_frequency(&self, reader: &heed::RoTxn<MainT>) -> ZResult<Option<FreqsMap>> {
match self
.main
.get::<Str, SerdeFreqsMap>(reader, FIELDS_FREQUENCY_KEY)?
.get::<_, Str, SerdeFreqsMap>(reader, FIELDS_FREQUENCY_KEY)?
{
Some(freqs) => Ok(Some(freqs)),
None => Ok(None),
}
}
pub fn put_customs(self, writer: &mut heed::RwTxn, customs: &[u8]) -> ZResult<()> {
pub fn put_customs(self, writer: &mut heed::RwTxn<MainT>, customs: &[u8]) -> ZResult<()> {
self.main
.put::<Str, ByteSlice>(writer, CUSTOMS_KEY, customs)
.put::<_, Str, ByteSlice>(writer, CUSTOMS_KEY, customs)
}
pub fn customs<'txn>(self, reader: &'txn heed::RoTxn) -> ZResult<Option<&'txn [u8]>> {
self.main.get::<Str, ByteSlice>(reader, CUSTOMS_KEY)
pub fn customs<'txn>(self, reader: &'txn heed::RoTxn<MainT>) -> ZResult<Option<&'txn [u8]>> {
self.main.get::<_, Str, ByteSlice>(reader, CUSTOMS_KEY)
}
}

View File

@ -27,6 +27,7 @@ use zerocopy::{AsBytes, FromBytes};
use crate::criterion::Criteria;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::database::{MainT, UpdateT};
use crate::serde::Deserializer;
use crate::{query_builder::QueryBuilder, update, DocumentId, Error, MResult};
@ -98,7 +99,7 @@ pub struct Index {
impl Index {
pub fn document<T: de::DeserializeOwned>(
&self,
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
attributes: Option<&HashSet<&str>>,
document_id: DocumentId,
) -> MResult<Option<T>> {
@ -126,7 +127,7 @@ impl Index {
pub fn document_attribute<T: de::DeserializeOwned>(
&self,
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
document_id: DocumentId,
attribute: SchemaAttr,
) -> MResult<Option<T>> {
@ -139,12 +140,12 @@ impl Index {
}
}
pub fn schema_update(&self, writer: &mut heed::RwTxn, schema: Schema) -> MResult<u64> {
pub fn schema_update(&self, writer: &mut heed::RwTxn<UpdateT>, schema: Schema) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
update::push_schema_update(writer, self.updates, self.updates_results, schema)
}
pub fn customs_update(&self, writer: &mut heed::RwTxn, customs: Vec<u8>) -> ZResult<u64> {
pub fn customs_update(&self, writer: &mut heed::RwTxn<UpdateT>, customs: Vec<u8>) -> ZResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
update::push_customs_update(writer, self.updates, self.updates_results, customs)
}
@ -173,7 +174,7 @@ impl Index {
)
}
pub fn clear_all(&self, writer: &mut heed::RwTxn) -> MResult<u64> {
pub fn clear_all(&self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
update::push_clear_all(writer, self.updates, self.updates_results)
}
@ -210,8 +211,8 @@ impl Index {
)
}
pub fn current_update_id(&self, reader: &heed::RoTxn) -> MResult<Option<u64>> {
match self.updates.last_update_id(reader)? {
pub fn current_update_id(&self, reader: &heed::RoTxn<UpdateT>) -> MResult<Option<u64>> {
match self.updates.last_update(reader)? {
Some((id, _)) => Ok(Some(id)),
None => Ok(None),
}
@ -219,18 +220,18 @@ impl Index {
pub fn update_status(
&self,
reader: &heed::RoTxn,
reader: &heed::RoTxn<UpdateT>,
update_id: u64,
) -> MResult<Option<update::UpdateStatus>> {
update::update_status(reader, self.updates, self.updates_results, update_id)
}
pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult<Vec<update::UpdateStatus>> {
pub fn all_updates_status(&self, reader: &heed::RoTxn<UpdateT>) -> MResult<Vec<update::UpdateStatus>> {
let mut updates = Vec::new();
let mut last_update_result_id = 0;
// retrieve all updates results
if let Some((last_id, _)) = self.updates_results.last_update_id(reader)? {
if let Some((last_id, _)) = self.updates_results.last_update(reader)? {
updates.reserve(last_id as usize);
for id in 0..=last_id {
@ -242,7 +243,7 @@ impl Index {
}
// retrieve all enqueued updates
if let Some((last_id, _)) = self.updates.last_update_id(reader)? {
if let Some((last_id, _)) = self.updates.last_update(reader)? {
for id in last_update_result_id + 1..=last_id {
if let Some(update) = self.update_status(reader, id)? {
updates.push(update);
@ -278,6 +279,7 @@ impl Index {
pub fn create(
env: &heed::Env,
update_env: &heed::Env,
name: &str,
updates_notifier: UpdateEventsEmitter,
) -> MResult<Index> {
@ -298,8 +300,8 @@ pub fn create(
let documents_fields_counts = env.create_database(Some(&documents_fields_counts_name))?;
let synonyms = env.create_database(Some(&synonyms_name))?;
let docs_words = env.create_database(Some(&docs_words_name))?;
let updates = env.create_database(Some(&updates_name))?;
let updates_results = env.create_database(Some(&updates_results_name))?;
let updates = update_env.create_database(Some(&updates_name))?;
let updates_results = update_env.create_database(Some(&updates_results_name))?;
Ok(Index {
main: Main { main },
@ -318,6 +320,7 @@ pub fn create(
pub fn open(
env: &heed::Env,
update_env: &heed::Env,
name: &str,
updates_notifier: UpdateEventsEmitter,
) -> MResult<Option<Index>> {
@ -356,11 +359,11 @@ pub fn open(
Some(docs_words) => docs_words,
None => return Ok(None),
};
let updates = match env.open_database(Some(&updates_name))? {
let updates = match update_env.open_database(Some(&updates_name))? {
Some(updates) => updates,
None => return Ok(None),
};
let updates_results = match env.open_database(Some(&updates_results_name))? {
let updates_results = match update_env.open_database(Some(&updates_results_name))? {
Some(updates_results) => updates_results,
None => return Ok(None),
};
@ -380,7 +383,11 @@ pub fn open(
}))
}
pub fn clear(writer: &mut heed::RwTxn, index: &Index) -> MResult<()> {
pub fn clear(
writer: &mut heed::RwTxn<MainT>,
update_writer: &mut heed::RwTxn<UpdateT>,
index: &Index,
) -> MResult<()> {
// clear all the stores
index.main.clear(writer)?;
index.postings_lists.clear(writer)?;
@ -388,7 +395,7 @@ pub fn clear(writer: &mut heed::RwTxn, index: &Index) -> MResult<()> {
index.documents_fields_counts.clear(writer)?;
index.synonyms.clear(writer)?;
index.docs_words.clear(writer)?;
index.updates.clear(writer)?;
index.updates_results.clear(writer)?;
index.updates.clear(update_writer)?;
index.updates_results.clear(update_writer)?;
Ok(())
}

View File

@ -1,4 +1,5 @@
use crate::DocIndex;
use crate::database::MainT;
use heed::types::{ByteSlice, CowSlice};
use heed::Result as ZResult;
use sdset::{Set, SetBuf};
@ -12,24 +13,24 @@ pub struct PostingsLists {
impl PostingsLists {
pub fn put_postings_list(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
word: &[u8],
words_indexes: &Set<DocIndex>,
) -> ZResult<()> {
self.postings_lists.put(writer, word, words_indexes)
}
pub fn del_postings_list(self, writer: &mut heed::RwTxn, word: &[u8]) -> ZResult<bool> {
pub fn del_postings_list(self, writer: &mut heed::RwTxn<MainT>, word: &[u8]) -> ZResult<bool> {
self.postings_lists.delete(writer, word)
}
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn clear(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<()> {
self.postings_lists.clear(writer)
}
pub fn postings_list<'txn>(
self,
reader: &'txn heed::RoTxn,
reader: &'txn heed::RoTxn<MainT>,
word: &[u8],
) -> ZResult<Option<Cow<'txn, Set<DocIndex>>>> {
match self.postings_lists.get(reader, word)? {

View File

@ -1,4 +1,5 @@
use heed::types::ByteSlice;
use crate::database::MainT;
use heed::Result as ZResult;
use std::sync::Arc;
@ -10,7 +11,7 @@ pub struct Synonyms {
impl Synonyms {
pub fn put_synonyms(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
word: &[u8],
synonyms: &fst::Set,
) -> ZResult<()> {
@ -18,15 +19,15 @@ impl Synonyms {
self.synonyms.put(writer, word, bytes)
}
pub fn del_synonyms(self, writer: &mut heed::RwTxn, word: &[u8]) -> ZResult<bool> {
pub fn del_synonyms(self, writer: &mut heed::RwTxn<MainT>, word: &[u8]) -> ZResult<bool> {
self.synonyms.delete(writer, word)
}
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn clear(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<()> {
self.synonyms.clear(writer)
}
pub fn synonyms(self, reader: &heed::RoTxn, word: &[u8]) -> ZResult<Option<fst::Set>> {
pub fn synonyms(self, reader: &heed::RoTxn<MainT>, word: &[u8]) -> ZResult<Option<fst::Set>> {
match self.synonyms.get(reader, word)? {
Some(bytes) => {
let len = bytes.len();

View File

@ -1,4 +1,5 @@
use super::BEU64;
use crate::database::UpdateT;
use crate::update::Update;
use heed::types::{OwnedType, SerdeJson};
use heed::Result as ZResult;
@ -10,7 +11,7 @@ pub struct Updates {
impl Updates {
// TODO do not trigger deserialize if possible
pub fn last_update_id(self, reader: &heed::RoTxn) -> ZResult<Option<(u64, Update)>> {
pub fn last_update(self, reader: &heed::RoTxn<UpdateT>) -> ZResult<Option<(u64, Update)>> {
match self.updates.last(reader)? {
Some((key, data)) => Ok(Some((key.get(), data))),
None => Ok(None),
@ -18,7 +19,7 @@ impl Updates {
}
// TODO do not trigger deserialize if possible
fn first_update_id(self, reader: &heed::RoTxn) -> ZResult<Option<(u64, Update)>> {
pub fn first_update(self, reader: &heed::RoTxn<UpdateT>) -> ZResult<Option<(u64, Update)>> {
match self.updates.first(reader)? {
Some((key, data)) => Ok(Some((key.get(), data))),
None => Ok(None),
@ -26,14 +27,14 @@ impl Updates {
}
// TODO do not trigger deserialize if possible
pub fn get(self, reader: &heed::RoTxn, update_id: u64) -> ZResult<Option<Update>> {
pub fn get(self, reader: &heed::RoTxn<UpdateT>, update_id: u64) -> ZResult<Option<Update>> {
let update_id = BEU64::new(update_id);
self.updates.get(reader, &update_id)
}
pub fn put_update(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
update_id: u64,
update: &Update,
) -> ZResult<()> {
@ -42,8 +43,13 @@ impl Updates {
self.updates.put(writer, &update_id, update)
}
pub fn pop_front(self, writer: &mut heed::RwTxn) -> ZResult<Option<(u64, Update)>> {
match self.first_update_id(writer)? {
pub fn del_update(self, writer: &mut heed::RwTxn<UpdateT>, update_id: u64) -> ZResult<bool> {
let update_id = BEU64::new(update_id);
self.updates.delete(writer, &update_id)
}
pub fn pop_front(self, writer: &mut heed::RwTxn<UpdateT>) -> ZResult<Option<(u64, Update)>> {
match self.first_update(writer)? {
Some((update_id, update)) => {
let key = BEU64::new(update_id);
self.updates.delete(writer, &key)?;
@ -53,7 +59,7 @@ impl Updates {
}
}
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn clear(self, writer: &mut heed::RwTxn<UpdateT>) -> ZResult<()> {
self.updates.clear(writer)
}
}

View File

@ -1,4 +1,5 @@
use super::BEU64;
use crate::database::UpdateT;
use crate::update::ProcessedUpdateResult;
use heed::types::{OwnedType, SerdeJson};
use heed::Result as ZResult;
@ -9,9 +10,9 @@ pub struct UpdatesResults {
}
impl UpdatesResults {
pub fn last_update_id(
pub fn last_update(
self,
reader: &heed::RoTxn,
reader: &heed::RoTxn<UpdateT>,
) -> ZResult<Option<(u64, ProcessedUpdateResult)>> {
match self.updates_results.last(reader)? {
Some((key, data)) => Ok(Some((key.get(), data))),
@ -21,7 +22,7 @@ impl UpdatesResults {
pub fn put_update_result(
self,
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
update_id: u64,
update_result: &ProcessedUpdateResult,
) -> ZResult<()> {
@ -31,14 +32,14 @@ impl UpdatesResults {
pub fn update_result(
self,
reader: &heed::RoTxn,
reader: &heed::RoTxn<UpdateT>,
update_id: u64,
) -> ZResult<Option<ProcessedUpdateResult>> {
let update_id = BEU64::new(update_id);
self.updates_results.get(reader, &update_id)
}
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
pub fn clear(self, writer: &mut heed::RwTxn<UpdateT>) -> ZResult<()> {
self.updates_results.clear(writer)
}
}

View File

@ -1,8 +1,9 @@
use crate::database::{MainT, UpdateT};
use crate::update::{next_update_id, Update};
use crate::{store, MResult, RankedMap};
pub fn apply_clear_all(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
@ -21,7 +22,7 @@ pub fn apply_clear_all(
}
pub fn push_clear_all(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
) -> MResult<u64> {

View File

@ -1,9 +1,11 @@
use crate::store;
use crate::update::{next_update_id, Update};
use heed::Result as ZResult;
use crate::database::{MainT, UpdateT};
use crate::store;
use crate::update::{next_update_id, Update};
pub fn apply_customs_update(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
customs: &[u8],
) -> ZResult<()> {
@ -11,7 +13,7 @@ pub fn apply_customs_update(
}
pub fn push_customs_update(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
customs: Vec<u8>,

View File

@ -4,6 +4,7 @@ use fst::{set::OpBuilder, SetBuilder};
use sdset::{duo::Union, SetOperation};
use serde::{Deserialize, Serialize};
use crate::database::{MainT, UpdateT};
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer};
@ -52,7 +53,7 @@ impl<D> DocumentsAddition<D> {
self.documents.push(document);
}
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64>
pub fn finalize(self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64>
where
D: serde::Serialize,
{
@ -75,7 +76,7 @@ impl<D> Extend<D> for DocumentsAddition<D> {
}
pub fn push_documents_addition<D: serde::Serialize>(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: Vec<D>,
@ -102,7 +103,7 @@ pub fn push_documents_addition<D: serde::Serialize>(
}
pub fn apply_documents_addition<'a, 'b>(
writer: &'a mut heed::RwTxn<'b>,
writer: &'a mut heed::RwTxn<'b, MainT>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
@ -181,7 +182,7 @@ pub fn apply_documents_addition<'a, 'b>(
}
pub fn apply_documents_partial_addition<'a, 'b>(
writer: &'a mut heed::RwTxn<'b>,
writer: &'a mut heed::RwTxn<'b, MainT>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
@ -277,7 +278,7 @@ pub fn apply_documents_partial_addition<'a, 'b>(
}
pub fn reindex_all_documents(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
@ -354,7 +355,7 @@ pub fn reindex_all_documents(
}
pub fn write_documents_addition_index(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,

View File

@ -4,6 +4,7 @@ use fst::{SetBuilder, Streamer};
use meilisearch_schema::Schema;
use sdset::{duo::DifferenceByKey, SetBuf, SetOperation};
use crate::database::{MainT, UpdateT};
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::serde::extract_document_id;
use crate::store;
@ -50,7 +51,7 @@ impl DocumentsDeletion {
Ok(())
}
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
pub fn finalize(self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_documents_deletion(
writer,
@ -69,7 +70,7 @@ impl Extend<DocumentId> for DocumentsDeletion {
}
pub fn push_documents_deletion(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
deletion: Vec<DocumentId>,
@ -83,7 +84,7 @@ pub fn push_documents_deletion(
}
pub fn apply_documents_deletion(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,

View File

@ -30,6 +30,7 @@ use log::debug;
use serde::{Deserialize, Serialize};
use crate::{store, DocumentId, MResult};
use crate::database::{MainT, UpdateT};
use meilisearch_schema::Schema;
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -203,14 +204,14 @@ pub enum UpdateStatus {
}
pub fn update_status(
reader: &heed::RoTxn,
update_reader: &heed::RoTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
update_id: u64,
) -> MResult<Option<UpdateStatus>> {
match updates_results_store.update_result(reader, update_id)? {
match updates_results_store.update_result(update_reader, update_id)? {
Some(result) => Ok(Some(UpdateStatus::Processed { content: result })),
None => match updates_store.get(reader, update_id)? {
None => match updates_store.get(update_reader, update_id)? {
Some(update) => Ok(Some(UpdateStatus::Enqueued {
content: EnqueuedUpdateResult {
update_id,
@ -224,25 +225,25 @@ pub fn update_status(
}
pub fn next_update_id(
writer: &mut heed::RwTxn,
update_writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
) -> ZResult<u64> {
let last_update_id = updates_store.last_update_id(writer)?;
let last_update_id = last_update_id.map(|(n, _)| n);
let last_update = updates_store.last_update(update_writer)?;
let last_update = last_update.map(|(n, _)| n);
let last_update_results_id = updates_results_store.last_update_id(writer)?;
let last_update_results_id = updates_results_store.last_update(update_writer)?;
let last_update_results_id = last_update_results_id.map(|(n, _)| n);
let max_update_id = cmp::max(last_update_id, last_update_results_id);
let max_update_id = cmp::max(last_update, last_update_results_id);
let new_update_id = max_update_id.map_or(0, |n| n + 1);
Ok(new_update_id)
}
pub fn update_task<'a, 'b>(
writer: &'a mut heed::RwTxn<'b>,
index: store::Index,
writer: &'a mut heed::RwTxn<'b, MainT>,
index: &store::Index,
update_id: u64,
update: Update,
) -> MResult<ProcessedUpdateResult> {

View File

@ -1,11 +1,12 @@
use meilisearch_schema::{Diff, Schema};
use crate::database::{MainT, UpdateT};
use crate::update::documents_addition::reindex_all_documents;
use crate::update::{next_update_id, Update};
use crate::{error::UnsupportedOperation, store, MResult};
pub fn apply_schema_update(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
new_schema: &Schema,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
@ -61,7 +62,7 @@ pub fn apply_schema_update(
}
pub fn push_schema_update(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
schema: Schema,

View File

@ -2,6 +2,7 @@ use std::collections::BTreeSet;
use fst::{set::OpBuilder, SetBuilder};
use crate::database::{MainT, UpdateT};
use crate::automaton::normalize_str;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::{next_update_id, Update};
@ -33,7 +34,7 @@ impl StopWordsAddition {
self.stop_words.insert(stop_word);
}
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
pub fn finalize(self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_stop_words_addition(
writer,
@ -46,7 +47,7 @@ impl StopWordsAddition {
}
pub fn push_stop_words_addition(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: BTreeSet<String>,
@ -60,7 +61,7 @@ pub fn push_stop_words_addition(
}
pub fn apply_stop_words_addition(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
postings_lists_store: store::PostingsLists,
addition: BTreeSet<String>,

View File

@ -2,6 +2,7 @@ use std::collections::BTreeSet;
use fst::{set::OpBuilder, SetBuilder};
use crate::database::{MainT, UpdateT};
use crate::automaton::normalize_str;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::documents_addition::reindex_all_documents;
@ -34,7 +35,7 @@ impl StopWordsDeletion {
self.stop_words.insert(stop_word);
}
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
pub fn finalize(self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_stop_words_deletion(
writer,
@ -47,7 +48,7 @@ impl StopWordsDeletion {
}
pub fn push_stop_words_deletion(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
deletion: BTreeSet<String>,
@ -61,7 +62,7 @@ pub fn push_stop_words_deletion(
}
pub fn apply_stop_words_deletion(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,

View File

@ -3,6 +3,7 @@ use std::collections::BTreeMap;
use fst::{set::OpBuilder, SetBuilder};
use sdset::SetBuf;
use crate::database::{MainT, UpdateT};
use crate::automaton::normalize_str;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::{next_update_id, Update};
@ -43,7 +44,7 @@ impl SynonymsAddition {
.extend(alternatives);
}
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
pub fn finalize(self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_synonyms_addition(
writer,
@ -56,7 +57,7 @@ impl SynonymsAddition {
}
pub fn push_synonyms_addition(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: BTreeMap<String, Vec<String>>,
@ -70,7 +71,7 @@ pub fn push_synonyms_addition(
}
pub fn apply_synonyms_addition(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
synonyms_store: store::Synonyms,
addition: BTreeMap<String, Vec<String>>,

View File

@ -4,6 +4,7 @@ use std::iter::FromIterator;
use fst::{set::OpBuilder, SetBuilder};
use sdset::SetBuf;
use crate::database::{MainT, UpdateT};
use crate::automaton::normalize_str;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::{next_update_id, Update};
@ -50,7 +51,7 @@ impl SynonymsDeletion {
}
}
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
pub fn finalize(self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_synonyms_deletion(
writer,
@ -63,7 +64,7 @@ impl SynonymsDeletion {
}
pub fn push_synonyms_deletion(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
deletion: BTreeMap<String, Option<Vec<String>>>,
@ -77,7 +78,7 @@ pub fn push_synonyms_deletion(
}
pub fn apply_synonyms_deletion(
writer: &mut heed::RwTxn,
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
synonyms_store: store::Synonyms,
deletion: BTreeMap<String, Option<Vec<String>>>,

View File

@ -16,7 +16,7 @@ bincode = "1.2.0"
chrono = { version = "0.4.9", features = ["serde"] }
crossbeam-channel = "0.4.0"
env_logger = "0.7.1"
heed = "0.5.0"
heed = "0.6.0"
http = "0.1.19"
indexmap = { version = "1.3.0", features = ["serde-1"] }
isahc = "0.7.6"

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use heed::types::{SerdeBincode, Str};
use log::error;
use meilisearch_core::{Database, Error as MError, MResult};
use meilisearch_core::{Database, MainT, UpdateT, Error as MError, MResult};
use sysinfo::Pid;
use crate::option::Opt;
@ -37,32 +37,32 @@ pub struct DataInner {
}
impl DataInner {
pub fn is_indexing(&self, reader: &heed::RoTxn, index: &str) -> MResult<Option<bool>> {
pub fn is_indexing(&self, reader: &heed::RoTxn<UpdateT>, index: &str) -> MResult<Option<bool>> {
match self.db.open_index(&index) {
Some(index) => index.current_update_id(&reader).map(|u| Some(u.is_some())),
None => Ok(None),
}
}
pub fn last_update(&self, reader: &heed::RoTxn) -> MResult<Option<DateTime<Utc>>> {
pub fn last_update(&self, reader: &heed::RoTxn<MainT>) -> MResult<Option<DateTime<Utc>>> {
match self
.db
.common_store()
.get::<Str, SerdeDatetime>(reader, LAST_UPDATE_KEY)?
.get::<_, Str, SerdeDatetime>(reader, LAST_UPDATE_KEY)?
{
Some(datetime) => Ok(Some(datetime)),
None => Ok(None),
}
}
pub fn set_last_update(&self, writer: &mut heed::RwTxn) -> MResult<()> {
pub fn set_last_update(&self, writer: &mut heed::RwTxn<MainT>) -> MResult<()> {
self.db
.common_store()
.put::<Str, SerdeDatetime>(writer, LAST_UPDATE_KEY, &Utc::now())
.put::<_, Str, SerdeDatetime>(writer, LAST_UPDATE_KEY, &Utc::now())
.map_err(Into::into)
}
pub fn compute_stats(&self, writer: &mut heed::RwTxn, index_uid: &str) -> MResult<()> {
pub fn compute_stats(&self, writer: &mut heed::RwTxn<MainT>, index_uid: &str) -> MResult<()> {
let index = match self.db.open_index(&index_uid) {
Some(index) => index,
None => {

View File

@ -4,6 +4,7 @@ use log::error;
use meilisearch_core::criterion::*;
use meilisearch_core::Highlight;
use meilisearch_core::{Index, RankedMap};
use meilisearch_core::MainT;
use meilisearch_schema::{Schema, SchemaAttr};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -157,7 +158,7 @@ impl<'a> SearchBuilder<'a> {
self
}
pub fn search(&self, reader: &heed::RoTxn) -> Result<SearchResult, Error> {
pub fn search(&self, reader: &heed::RoTxn<MainT>) -> Result<SearchResult, Error> {
let schema = self.index.main.schema(reader);
let schema = schema.map_err(|e| Error::Internal(e.to_string()))?;
let schema = match schema {
@ -285,7 +286,7 @@ impl<'a> SearchBuilder<'a> {
pub fn get_criteria(
&self,
reader: &heed::RoTxn,
reader: &heed::RoTxn<MainT>,
ranked_map: &'a RankedMap,
schema: &Schema,
) -> Result<Option<Criteria<'a>>, Error> {

View File

@ -29,14 +29,13 @@ impl ContextExt for Context<Data> {
let request_index: Option<String> = None; //self.param::<String>("index").ok();
let db = &self.state().db;
let env = &db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let token_key = format!("{}{}", TOKEN_PREFIX_KEY, user_api_key);
let token_config = db
.common_store()
.get::<Str, SerdeBincode<Token>>(&reader, &token_key)
.get::<_, Str, SerdeBincode<Token>>(&reader, &token_key)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::invalid_token(format!(
"Api key does not exist: {}",

View File

@ -21,8 +21,8 @@ pub async fn get_document(ctx: Context<Data>) -> SResult<Response> {
let identifier = ctx.identifier()?;
let document_id = meilisearch_core::serde::compute_document_id(identifier.clone());
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let response = index
.document::<IndexMap<String, Value>>(&reader, None, document_id)
@ -49,16 +49,16 @@ pub async fn delete_document(ctx: Context<Data>) -> SResult<Response> {
let identifier = ctx.identifier()?;
let document_id = meilisearch_core::serde::compute_document_id(identifier.clone());
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let mut update_writer = db.update_write_txn().map_err(ResponseError::internal)?;
let mut documents_deletion = index.documents_deletion();
documents_deletion.delete_document_by_id(document_id);
let update_id = documents_deletion
.finalize(&mut writer)
.finalize(&mut update_writer)
.map_err(ResponseError::internal)?;
writer.commit().map_err(ResponseError::internal)?;
update_writer.commit().map_err(ResponseError::internal)?;
let response_body = IndexUpdateResponse { update_id };
Ok(tide::response::json(response_body)
@ -83,8 +83,8 @@ pub async fn get_all_documents(ctx: Context<Data>) -> SResult<Response> {
let offset = query.offset.unwrap_or(0);
let limit = query.limit.unwrap_or(20);
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let documents_ids: Result<BTreeSet<_>, _> =
match index.documents_fields_counts.documents_ids(&reader) {
@ -146,18 +146,19 @@ async fn update_multiple_documents(mut ctx: Context<Data>, is_partial: bool) ->
ctx.body_json().await.map_err(ResponseError::bad_request)?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let mut update_writer = db.update_write_txn().map_err(ResponseError::internal)?;
let current_schema = index
.main
.schema(&writer)
.schema(&reader)
.map_err(ResponseError::internal)?;
if current_schema.is_none() {
match data.first().and_then(infered_schema) {
Some(schema) => {
index
.schema_update(&mut writer, schema)
.schema_update(&mut update_writer, schema)
.map_err(ResponseError::internal)?;
}
None => return Err(ResponseError::bad_request("Could not infer a schema")),
@ -175,10 +176,10 @@ async fn update_multiple_documents(mut ctx: Context<Data>, is_partial: bool) ->
}
let update_id = document_addition
.finalize(&mut writer)
.finalize(&mut update_writer)
.map_err(ResponseError::internal)?;
writer.commit().map_err(ResponseError::internal)?;
update_writer.commit().map_err(ResponseError::internal)?;
let response_body = IndexUpdateResponse { update_id };
Ok(tide::response::json(response_body)
@ -200,8 +201,8 @@ pub async fn delete_multiple_documents(mut ctx: Context<Data>) -> SResult<Respon
let data: Vec<Value> = ctx.body_json().await.map_err(ResponseError::bad_request)?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let mut documents_deletion = index.documents_deletion();
@ -229,8 +230,9 @@ pub async fn clear_all_documents(ctx: Context<Data>) -> SResult<Response> {
let index = ctx.index()?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let update_id = index
.clear_all(&mut writer)
.map_err(ResponseError::internal)?;

View File

@ -11,12 +11,11 @@ const UNHEALTHY_KEY: &str = "_is_unhealthy";
pub async fn get_health(ctx: Context<Data>) -> SResult<()> {
let db = &ctx.state().db;
let env = &db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let common_store = ctx.state().db.common_store();
if let Ok(Some(_)) = common_store.get::<Str, Unit>(&reader, UNHEALTHY_KEY) {
if let Ok(Some(_)) = common_store.get::<_, Str, Unit>(&reader, UNHEALTHY_KEY) {
return Err(ResponseError::Maintenance);
}
@ -27,11 +26,10 @@ pub async fn set_healthy(ctx: Context<Data>) -> SResult<()> {
ctx.is_allowed(Admin)?;
let db = &ctx.state().db;
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let mut writer = db.main_write_txn().map_err(ResponseError::internal)?;
let common_store = ctx.state().db.common_store();
match common_store.delete::<Str>(&mut writer, UNHEALTHY_KEY) {
match common_store.delete::<_, Str>(&mut writer, UNHEALTHY_KEY) {
Ok(_) => (),
Err(e) => return Err(ResponseError::internal(e)),
}
@ -47,12 +45,11 @@ pub async fn set_unhealthy(ctx: Context<Data>) -> SResult<()> {
ctx.is_allowed(Admin)?;
let db = &ctx.state().db;
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let mut writer = db.main_write_txn().map_err(ResponseError::internal)?;
let common_store = ctx.state().db.common_store();
if let Err(e) = common_store.put::<Str, Unit>(&mut writer, UNHEALTHY_KEY, &()) {
if let Err(e) = common_store.put::<_, Str, Unit>(&mut writer, UNHEALTHY_KEY, &()) {
return Err(ResponseError::internal(e));
}

View File

@ -31,8 +31,8 @@ pub async fn list_indexes(ctx: Context<Data>) -> SResult<Response> {
let indexes_uids = ctx.state().db.indexes_uids();
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let mut response_body = Vec::new();
@ -89,8 +89,8 @@ pub async fn get_index(ctx: Context<Data>) -> SResult<Response> {
let index = ctx.index()?;
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let uid = ctx.url_param("index")?;
let name = index
@ -164,8 +164,8 @@ pub async fn create_index(mut ctx: Context<Data>) -> SResult<Response> {
Err(e) => return Err(ResponseError::create_index(e)),
};
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let mut writer = db.main_write_txn().map_err(ResponseError::internal)?;
let mut update_writer = db.update_write_txn().map_err(ResponseError::internal)?;
created_index
.main
@ -184,12 +184,13 @@ pub async fn create_index(mut ctx: Context<Data>) -> SResult<Response> {
let mut response_update_id = None;
if let Some(schema) = schema {
let update_id = created_index
.schema_update(&mut writer, schema)
.schema_update(&mut update_writer, schema)
.map_err(ResponseError::internal)?;
response_update_id = Some(update_id)
}
writer.commit().map_err(ResponseError::internal)?;
update_writer.commit().map_err(ResponseError::internal)?;
let response_body = IndexCreateResponse {
name: body.name,
@ -232,9 +233,7 @@ pub async fn update_index(mut ctx: Context<Data>) -> SResult<Response> {
let index = ctx.index()?;
let db = &ctx.state().db;
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let mut writer = db.main_write_txn().map_err(ResponseError::internal)?;
index
.main
@ -247,7 +246,7 @@ pub async fn update_index(mut ctx: Context<Data>) -> SResult<Response> {
.map_err(ResponseError::internal)?;
writer.commit().map_err(ResponseError::internal)?;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let created_at = index
.main
@ -286,8 +285,8 @@ pub async fn get_index_schema(ctx: Context<Data>) -> SResult<Response> {
// Tide doesn't support "no query param"
let params: SchemaParams = ctx.url_query().unwrap_or_default();
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let schema = index
.main
@ -326,8 +325,7 @@ pub async fn update_schema(mut ctx: Context<Data>) -> SResult<Response> {
};
let db = &ctx.state().db;
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let index = db
.open_index(&index_uid)
@ -348,8 +346,8 @@ pub async fn update_schema(mut ctx: Context<Data>) -> SResult<Response> {
pub async fn get_update_status(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(IndexesRead)?;
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.update_read_txn().map_err(ResponseError::internal)?;
let update_id = ctx
.param::<u64>("update_id")
@ -375,8 +373,8 @@ pub async fn get_update_status(ctx: Context<Data>) -> SResult<Response> {
pub async fn get_all_updates_status(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(IndexesRead)?;
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.update_read_txn().map_err(ResponseError::internal)?;
let index = ctx.index()?;
let all_status = index
@ -413,8 +411,8 @@ pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpda
}
if let Some(index) = data.db.open_index(&index_uid) {
let env = &data.db.env;
let mut writer = match env.write_txn() {
let db = &data.db;
let mut writer = match db.main_write_txn() {
Ok(writer) => writer,
Err(e) => {
error!("Impossible to get write_txn; {}", e);

View File

@ -26,15 +26,14 @@ pub async fn list(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(Admin)?;
let db = &ctx.state().db;
let env = &db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let common_store = db.common_store();
let mut response: Vec<Token> = Vec::new();
let iter = common_store
.prefix_iter::<Str, SerdeBincode<Token>>(&reader, TOKEN_PREFIX_KEY)
.prefix_iter::<_, Str, SerdeBincode<Token>>(&reader, TOKEN_PREFIX_KEY)
.map_err(ResponseError::internal)?;
for result in iter {
@ -50,14 +49,13 @@ pub async fn get(ctx: Context<Data>) -> SResult<Response> {
let request_key = ctx.url_param("key")?;
let db = &ctx.state().db;
let env = &db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let token_key = format!("{}{}", TOKEN_PREFIX_KEY, request_key);
let token_config = db
.common_store()
.get::<Str, SerdeBincode<Token>>(&reader, &token_key)
.get::<_, Str, SerdeBincode<Token>>(&reader, &token_key)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::not_found(format!(
"token key: {}",
@ -97,11 +95,10 @@ pub async fn create(mut ctx: Context<Data>) -> SResult<Response> {
};
let db = &ctx.state().db;
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let mut writer = db.main_write_txn().map_err(ResponseError::internal)?;
db.common_store()
.put::<Str, SerdeBincode<Token>>(&mut writer, &token_key, &token_definition)
.put::<_, Str, SerdeBincode<Token>>(&mut writer, &token_key, &token_definition)
.map_err(ResponseError::internal)?;
writer.commit().map_err(ResponseError::internal)?;
@ -128,15 +125,14 @@ pub async fn update(mut ctx: Context<Data>) -> SResult<Response> {
let data: UpdatedRequest = ctx.body_json().await.map_err(ResponseError::bad_request)?;
let db = &ctx.state().db;
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let mut writer = db.main_write_txn().map_err(ResponseError::internal)?;
let common_store = db.common_store();
let token_key = format!("{}{}", TOKEN_PREFIX_KEY, request_key);
let mut token_config = common_store
.get::<Str, SerdeBincode<Token>>(&writer, &token_key)
.get::<_, Str, SerdeBincode<Token>>(&writer, &token_key)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::not_found(format!(
"token key: {}",
@ -167,7 +163,7 @@ pub async fn update(mut ctx: Context<Data>) -> SResult<Response> {
token_config.updated_at = Utc::now();
common_store
.put::<Str, SerdeBincode<Token>>(&mut writer, &token_key, &token_config)
.put::<_, Str, SerdeBincode<Token>>(&mut writer, &token_key, &token_config)
.map_err(ResponseError::internal)?;
writer.commit().map_err(ResponseError::internal)?;
@ -182,15 +178,14 @@ pub async fn delete(ctx: Context<Data>) -> SResult<StatusCode> {
let request_key = ctx.url_param("key")?;
let db = &ctx.state().db;
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let mut writer = db.main_write_txn().map_err(ResponseError::internal)?;
let common_store = db.common_store();
let token_key = format!("{}{}", TOKEN_PREFIX_KEY, request_key);
common_store
.delete::<Str>(&mut writer, &token_key)
.delete::<_, Str>(&mut writer, &token_key)
.map_err(ResponseError::internal)?;
writer.commit().map_err(ResponseError::internal)?;

View File

@ -33,8 +33,8 @@ pub async fn search_with_url_query(ctx: Context<Data>) -> SResult<Response> {
// ctx.is_allowed(DocumentsRead)?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let schema = index
.main
@ -210,9 +210,7 @@ pub async fn search_multi_index(mut ctx: Context<Data>) -> SResult<Response> {
}
}
let env = &db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let response = search_builder
.search(&reader)
.map_err(ResponseError::internal)?;

View File

@ -34,8 +34,8 @@ pub async fn get(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(SettingsRead)?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let settings = match index.main.customs(&reader).unwrap() {
Some(bytes) => bincode::deserialize(bytes).unwrap(),
@ -52,10 +52,11 @@ pub async fn update(mut ctx: Context<Data>) -> SResult<Response> {
let index = ctx.index()?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_write_txn().map_err(ResponseError::internal)?;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let mut current_settings = match index.main.customs(&writer).unwrap() {
let mut current_settings = match index.main.customs(&reader).unwrap() {
Some(bytes) => bincode::deserialize(bytes).unwrap(),
None => SettingBody::default(),
};

View File

@ -26,8 +26,9 @@ pub async fn index_stat(ctx: Context<Data>) -> SResult<Response> {
let index_uid = ctx.url_param("index")?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let update_reader = db.update_read_txn().map_err(ResponseError::internal)?;
let number_of_documents = index
.main
@ -42,7 +43,7 @@ pub async fn index_stat(ctx: Context<Data>) -> SResult<Response> {
let is_indexing = ctx
.state()
.is_indexing(&reader, &index_uid)
.is_indexing(&update_reader, &index_uid)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'is_indexing' date not found"))?;
@ -68,8 +69,8 @@ pub async fn get_stats(ctx: Context<Data>) -> SResult<Response> {
let mut index_list = HashMap::new();
let db = &ctx.state().db;
let env = &db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let update_reader = db.update_read_txn().map_err(ResponseError::internal)?;
let indexes_set = ctx.state().db.indexes_uids();
for index_uid in indexes_set {
@ -90,7 +91,7 @@ pub async fn get_stats(ctx: Context<Data>) -> SResult<Response> {
let is_indexing = ctx
.state()
.is_indexing(&reader, &index_uid)
.is_indexing(&update_reader, &index_uid)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'is_indexing' date not found"))?;

View File

@ -12,8 +12,8 @@ pub async fn list(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(SettingsRead)?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let stop_words_fst = index
.main
@ -35,8 +35,8 @@ pub async fn add(mut ctx: Context<Data>) -> SResult<Response> {
let data: Vec<String> = ctx.body_json().await.map_err(ResponseError::bad_request)?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let mut stop_words_addition = index.stop_words_addition();
for stop_word in data {
@ -61,8 +61,8 @@ pub async fn delete(mut ctx: Context<Data>) -> SResult<Response> {
let data: Vec<String> = ctx.body_json().await.map_err(ResponseError::bad_request)?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let mut stop_words_deletion = index.stop_words_deletion();
for stop_word in data {

View File

@ -31,8 +31,8 @@ pub async fn list(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(SettingsRead)?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let synonyms_fst = index
.main
@ -65,8 +65,8 @@ pub async fn get(ctx: Context<Data>) -> SResult<Response> {
let synonym = ctx.url_param("synonym")?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let synonym_list = index
.synonyms
@ -87,8 +87,8 @@ pub async fn create(mut ctx: Context<Data>) -> SResult<Response> {
let index = ctx.index()?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let mut synonyms_addition = index.synonyms_addition();
@ -125,8 +125,8 @@ pub async fn update(mut ctx: Context<Data>) -> SResult<Response> {
let index = ctx.index()?;
let data: Vec<String> = ctx.body_json().await.map_err(ResponseError::bad_request)?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let mut synonyms_addition = index.synonyms_addition();
synonyms_addition.add_synonym(synonym.clone(), data.clone().into_iter());
@ -147,8 +147,8 @@ pub async fn delete(ctx: Context<Data>) -> SResult<Response> {
let synonym = ctx.url_param("synonym")?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let mut synonyms_deletion = index.synonyms_deletion();
synonyms_deletion.delete_all_alternatives_of(synonym);
@ -171,8 +171,8 @@ pub async fn batch_write(mut ctx: Context<Data>) -> SResult<Response> {
let index = ctx.index()?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let mut synonyms_addition = index.synonyms_addition();
for raw in data {
@ -207,12 +207,13 @@ pub async fn clear(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(SettingsWrite)?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let db = &ctx.state().db;
let reader = db.main_read_txn().map_err(ResponseError::internal)?;
let mut writer = db.update_write_txn().map_err(ResponseError::internal)?;
let synonyms_fst = index
.main
.synonyms_fst(&writer)
.synonyms_fst(&reader)
.map_err(ResponseError::internal)?;
let synonyms_fst = synonyms_fst.unwrap_or_default();