From aa05459e4f4412bf730255fc49d6e891452918ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 7 Oct 2019 16:16:04 +0200 Subject: [PATCH] Introduce a background thread that manage updates to do --- meilidb-core/Cargo.toml | 3 + meilidb-core/src/database.rs | 76 ++++++++++-- meilidb-core/src/error.rs | 46 ++++--- meilidb-core/src/main.rs | 28 +++-- meilidb-core/src/store/mod.rs | 44 ++++++- meilidb-core/src/store/updates.rs | 27 ++--- meilidb-core/src/store/updates_results.rs | 26 ++++ meilidb-core/src/update/documents_addition.rs | 35 +++++- meilidb-core/src/update/documents_deletion.rs | 29 ++++- meilidb-core/src/update/mod.rs | 112 +++++++++++------- 10 files changed, 317 insertions(+), 109 deletions(-) diff --git a/meilidb-core/Cargo.toml b/meilidb-core/Cargo.toml index 9e0bbd71f..775536524 100644 --- a/meilidb-core/Cargo.toml +++ b/meilidb-core/Cargo.toml @@ -7,8 +7,11 @@ edition = "2018" [dependencies] bincode = "1.1.4" byteorder = "1.3.2" +crossbeam-channel = "0.3.9" deunicode = "1.0.0" +env_logger = "0.7.0" hashbrown = { version = "0.6.0", features = ["serde"] } +log = "0.4.8" meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } once_cell = "1.2.0" diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index 620b1d462..03c340603 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -1,13 +1,41 @@ use std::collections::HashMap; -use std::io; use std::path::Path; use std::sync::{Arc, RwLock}; -use crate::{store, Index, MResult}; +use std::{fs, thread}; + +use crossbeam_channel::Receiver; +use log::error; + +use crate::{store, update, Index, MResult}; pub struct Database { pub rkv: Arc>, main_store: rkv::SingleStore, - indexes: RwLock>, + indexes_store: rkv::SingleStore, + indexes: RwLock)>>, +} + +fn update_awaiter(receiver: Receiver<()>, rkv: Arc>, index: Index) { + for () in receiver { + // consume all updates in order (oldest first) + loop { + let rkv = match rkv.read() { + Ok(rkv) => rkv, + Err(e) => { error!("rkv RwLock read failed: {}", e); break } + }; + let mut writer = match rkv.write() { + Ok(writer) => writer, + Err(e) => { error!("LMDB writer transaction begin failed: {}", e); break } + }; + + match update::update_task(&mut writer, index.clone(), None as Option::) { + Ok(true) => if let Err(e) = writer.commit() { error!("update transaction failed: {}", e) }, + // no more updates to handle for now + Ok(false) => { writer.abort(); break }, + Err(e) => { error!("update task failed: {}", e); writer.abort() }, + } + } + } } impl Database { @@ -15,6 +43,8 @@ impl Database { let manager = rkv::Manager::singleton(); let mut rkv_write = manager.write().unwrap(); + fs::create_dir_all(path.as_ref())?; + let rkv = rkv_write .get_or_create(path.as_ref(), |path| { let mut builder = rkv::Rkv::environment_builder(); @@ -26,12 +56,13 @@ impl Database { let rkv_read = rkv.read().unwrap(); let create_options = rkv::store::Options::create(); - let main_store = rkv_read.open_single("indexes", create_options)?; + let main_store = rkv_read.open_single("main", create_options)?; + let indexes_store = rkv_read.open_single("indexes", create_options)?; // list all indexes that needs to be opened let mut must_open = Vec::new(); let reader = rkv_read.read()?; - for result in main_store.iter_start(&reader)? { + for result in indexes_store.iter_start(&reader)? { let (key, _) = result?; if let Ok(index_name) = std::str::from_utf8(key) { must_open.push(index_name.to_owned()); @@ -43,13 +74,24 @@ impl Database { // open the previously aggregated indexes let mut indexes = HashMap::new(); for index_name in must_open { - let index = store::open(&rkv_read, &index_name)?; - indexes.insert(index_name, index); + + let (sender, receiver) = crossbeam_channel::bounded(100); + let index = store::open(&rkv_read, &index_name, sender.clone())?; + let rkv_clone = rkv.clone(); + let index_clone = index.clone(); + let handle = thread::spawn(move || update_awaiter(receiver, rkv_clone, index_clone)); + + // send an update notification to make sure that + // possible previous boot updates are consumed + sender.send(()).unwrap(); + + let result = indexes.insert(index_name, (index, handle)); + assert!(result.is_none(), "The index should not have been already open"); } drop(rkv_read); - Ok(Database { rkv, main_store, indexes: RwLock::new(indexes) }) + Ok(Database { rkv, main_store, indexes_store, indexes: RwLock::new(indexes) }) } pub fn open_index(&self, name: impl Into) -> MResult { @@ -57,20 +99,26 @@ impl Database { let name = name.into(); match indexes_lock.get(&name) { - Some(index) => Ok(*index), + Some((index, _)) => Ok(index.clone()), None => { drop(indexes_lock); let rkv_lock = self.rkv.read().unwrap(); - let index = store::create(&rkv_lock, &name)?; + let (sender, receiver) = crossbeam_channel::bounded(100); + let index = store::create(&rkv_lock, &name, sender)?; let mut writer = rkv_lock.write()?; let value = rkv::Value::Blob(&[]); - self.main_store.put(&mut writer, &name, &value)?; + self.indexes_store.put(&mut writer, &name, &value)?; { let mut indexes_write = self.indexes.write().unwrap(); - indexes_write.entry(name).or_insert(index); + indexes_write.entry(name).or_insert_with(|| { + let rkv_clone = self.rkv.clone(); + let index_clone = index.clone(); + let handle = thread::spawn(move || update_awaiter(receiver, rkv_clone, index_clone)); + (index.clone(), handle) + }); } writer.commit()?; @@ -84,4 +132,8 @@ impl Database { let indexes = self.indexes.read().unwrap(); Ok(indexes.keys().cloned().collect()) } + + pub fn main_store(&self) -> rkv::SingleStore { + self.main_store + } } diff --git a/meilidb-core/src/error.rs b/meilidb-core/src/error.rs index dd0200f30..ae3ffd884 100644 --- a/meilidb-core/src/error.rs +++ b/meilidb-core/src/error.rs @@ -1,55 +1,62 @@ -use std::{error, fmt}; +use std::{error, fmt, io}; use crate::serde::SerializerError; pub type MResult = Result; #[derive(Debug)] pub enum Error { + Io(io::Error), SchemaDiffer, SchemaMissing, WordIndexMissing, MissingDocumentId, - RkvError(rkv::StoreError), - FstError(fst::Error), - RmpDecodeError(rmp_serde::decode::Error), - RmpEncodeError(rmp_serde::encode::Error), - BincodeError(bincode::Error), - SerializerError(SerializerError), + Rkv(rkv::StoreError), + Fst(fst::Error), + RmpDecode(rmp_serde::decode::Error), + RmpEncode(rmp_serde::encode::Error), + Bincode(bincode::Error), + Serializer(SerializerError), +} + +impl From for Error { + fn from(error: io::Error) -> Error { + Error::Io(error) + } } impl From for Error { fn from(error: rkv::StoreError) -> Error { - Error::RkvError(error) + Error::Rkv(error) } } impl From for Error { fn from(error: fst::Error) -> Error { - Error::FstError(error) + Error::Fst(error) } } impl From for Error { fn from(error: rmp_serde::decode::Error) -> Error { - Error::RmpDecodeError(error) + Error::RmpDecode(error) } } impl From for Error { fn from(error: rmp_serde::encode::Error) -> Error { - Error::RmpEncodeError(error) + Error::RmpEncode(error) } } impl From for Error { fn from(error: bincode::Error) -> Error { - Error::BincodeError(error) + Error::Bincode(error) } } impl From for Error { fn from(error: SerializerError) -> Error { - Error::SerializerError(error) + Error::Serializer(error) } } @@ -57,16 +64,17 @@ impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { use self::Error::*; match self { + Io(e) => write!(f, "{}", e), SchemaDiffer => write!(f, "schemas differ"), SchemaMissing => write!(f, "this index does not have a schema"), WordIndexMissing => write!(f, "this index does not have a word index"), MissingDocumentId => write!(f, "document id is missing"), - RkvError(e) => write!(f, "rkv error; {}", e), - FstError(e) => write!(f, "fst error; {}", e), - RmpDecodeError(e) => write!(f, "rmp decode error; {}", e), - RmpEncodeError(e) => write!(f, "rmp encode error; {}", e), - BincodeError(e) => write!(f, "bincode error; {}", e), - SerializerError(e) => write!(f, "serializer error; {}", e), + Rkv(e) => write!(f, "rkv error; {}", e), + Fst(e) => write!(f, "fst error; {}", e), + RmpDecode(e) => write!(f, "rmp decode error; {}", e), + RmpEncode(e) => write!(f, "rmp encode error; {}", e), + Bincode(e) => write!(f, "bincode error; {}", e), + Serializer(e) => write!(f, "serializer error; {}", e), } } } diff --git a/meilidb-core/src/main.rs b/meilidb-core/src/main.rs index 5d230c82c..247edbff4 100644 --- a/meilidb-core/src/main.rs +++ b/meilidb-core/src/main.rs @@ -1,17 +1,25 @@ use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions}; use std::{fs, path::Path}; -use meilidb_core::{Database, QueryBuilder}; +use meilidb_core::{Database, MResult, QueryBuilder}; + +fn main() -> MResult<()> { + env_logger::init(); -fn main() { let path = Path::new("test.rkv"); - fs::create_dir_all(path).unwrap(); - - let database = Database::open_or_create(path).unwrap(); + let database = Database::open_or_create(path)?; println!("{:?}", database.indexes_names()); - let hello = database.open_index("hello").unwrap(); - let hello1 = database.open_index("hello1").unwrap(); - let hello2 = database.open_index("hello2").unwrap(); + let hello = database.open_index("hello")?; + let hello1 = database.open_index("hello1")?; + let hello2 = database.open_index("hello2")?; + + let mut additions = hello.documents_addition(); + additions.extend(vec![()]); + + let rkv = database.rkv.read().unwrap(); + let writer = rkv.write()?; + + additions.finalize(writer)?; // { // let mut writer = env.write().unwrap(); @@ -44,4 +52,8 @@ fn main() { // let documents = builder.query(&reader, "oubli", 0..20).unwrap(); // println!("{:?}", documents); + + std::thread::sleep(std::time::Duration::from_secs(10)); + + Ok(()) } diff --git a/meilidb-core/src/store/mod.rs b/meilidb-core/src/store/mod.rs index 47dad0a3b..c57b82131 100644 --- a/meilidb-core/src/store/mod.rs +++ b/meilidb-core/src/store/mod.rs @@ -14,6 +14,8 @@ pub use self::synonyms::Synonyms; pub use self::updates::Updates; pub use self::updates_results::UpdatesResults; +use crate::update; + fn aligned_to(bytes: &[u8], align: usize) -> bool { (bytes as *const _ as *const () as usize) % align == 0 } @@ -46,31 +48,62 @@ fn updates_results_name(name: &str) -> String { format!("store-{}-updates-results", name) } -#[derive(Copy, Clone)] +#[derive(Clone)] pub struct Index { pub main: Main, pub postings_lists: PostingsLists, pub documents_fields: DocumentsFields, pub synonyms: Synonyms, pub docs_words: DocsWords, + pub updates: Updates, pub updates_results: UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, } -pub fn create(env: &rkv::Rkv, name: &str) -> Result { - open_options(env, name, rkv::StoreOptions::create()) +impl Index { + pub fn documents_addition(&self) -> update::DocumentsAddition { + update::DocumentsAddition::new( + self.updates, + self.updates_results, + self.updates_notifier.clone(), + ) + } + + pub fn documents_deletion(&self) -> update::DocumentsDeletion { + update::DocumentsDeletion::new( + self.updates, + self.updates_results, + self.updates_notifier.clone(), + ) + } } -pub fn open(env: &rkv::Rkv, name: &str) -> Result { +pub fn create( + env: &rkv::Rkv, + name: &str, + updates_notifier: crossbeam_channel::Sender<()>, +) -> Result +{ + open_options(env, name, rkv::StoreOptions::create(), updates_notifier) +} + +pub fn open( + env: &rkv::Rkv, + name: &str, + updates_notifier: crossbeam_channel::Sender<()>, +) -> Result +{ let mut options = rkv::StoreOptions::default(); options.create = false; - open_options(env, name, options) + open_options(env, name, options, updates_notifier) } fn open_options( env: &rkv::Rkv, name: &str, options: rkv::StoreOptions, + updates_notifier: crossbeam_channel::Sender<()>, ) -> Result { // create all the store names @@ -99,5 +132,6 @@ fn open_options( docs_words: DocsWords { docs_words }, updates: Updates { updates }, updates_results: UpdatesResults { updates_results }, + updates_notifier, }) } diff --git a/meilidb-core/src/store/updates.rs b/meilidb-core/src/store/updates.rs index 28ff2306a..941abba6b 100644 --- a/meilidb-core/src/store/updates.rs +++ b/meilidb-core/src/store/updates.rs @@ -10,7 +10,7 @@ pub struct Updates { impl Updates { // TODO we should use the MDB_LAST op but // it is not exposed by the rkv library - fn last_update_id<'a>( + pub fn last_update_id<'a>( &self, reader: &'a impl rkv::Readable, ) -> Result>)>, rkv::StoreError> @@ -60,21 +60,18 @@ impl Updates { self.updates.get(reader, update_id_bytes).map(|v| v.is_some()) } - pub fn push_back( + pub fn put_update( &self, writer: &mut rkv::Writer, + update_id: u64, update: &Update, - ) -> MResult + ) -> MResult<()> { - let last_update_id = self.last_update_id(writer)?; - let last_update_id = last_update_id.map_or(0, |(n, _)| n + 1); - let last_update_id_bytes = last_update_id.to_be_bytes(); - + let update_id_bytes = update_id.to_be_bytes(); let update = rmp_serde::to_vec_named(&update)?; let blob = Value::Blob(&update); - self.updates.put(writer, last_update_id_bytes, &blob)?; - - Ok(last_update_id) + self.updates.put(writer, update_id_bytes, &blob)?; + Ok(()) } pub fn pop_front( @@ -82,20 +79,20 @@ impl Updates { writer: &mut rkv::Writer, ) -> MResult> { - let (last_id, last_data) = match self.first_update_id(writer)? { + let (first_id, first_data) = match self.first_update_id(writer)? { Some(entry) => entry, None => return Ok(None), }; - match last_data { + match first_data { Some(Value::Blob(bytes)) => { let update = rmp_serde::from_read_ref(&bytes)?; // remove it from the database now - let last_id_bytes = last_id.to_be_bytes(); - self.updates.delete(writer, last_id_bytes)?; + let first_id_bytes = first_id.to_be_bytes(); + self.updates.delete(writer, first_id_bytes)?; - Ok(Some((last_id, update))) + Ok(Some((first_id, update))) }, Some(value) => panic!("invalid type {:?}", value), None => Ok(None), diff --git a/meilidb-core/src/store/updates_results.rs b/meilidb-core/src/store/updates_results.rs index 8fe0ccb9f..9b0c2d435 100644 --- a/meilidb-core/src/store/updates_results.rs +++ b/meilidb-core/src/store/updates_results.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use rkv::Value; use crate::{update::UpdateResult, MResult}; @@ -7,6 +8,31 @@ pub struct UpdatesResults { } impl UpdatesResults { + // TODO we should use the MDB_LAST op but + // it is not exposed by the rkv library + pub fn last_update_id<'a>( + &self, + reader: &'a impl rkv::Readable, + ) -> Result>)>, rkv::StoreError> + { + let mut last = None; + let iter = self.updates_results.iter_start(reader)?; + for result in iter { + let (key, data) = result?; + last = Some((key, data)); + } + + let (last_key, last_data) = match last { + Some(entry) => entry, + None => return Ok(None), + }; + + let array = last_key.try_into().unwrap(); + let number = u64::from_be_bytes(array); + + Ok(Some((number, last_data))) + } + pub fn put_update_result( &self, writer: &mut rkv::Writer, diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index 7e87d3121..16de42138 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -13,22 +13,49 @@ use crate::{Error, RankedMap}; pub struct DocumentsAddition { updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, documents: Vec, } impl DocumentsAddition { - pub fn new(updates_store: store::Updates) -> DocumentsAddition { - DocumentsAddition { updates_store, documents: Vec::new() } + pub fn new( + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, + ) -> DocumentsAddition + { + DocumentsAddition { + updates_store, + updates_results_store, + updates_notifier, + documents: Vec::new(), + } } pub fn update_document(&mut self, document: D) { self.documents.push(document); } - pub fn finalize(self, writer: &mut rkv::Writer) -> Result + pub fn finalize(self, mut writer: rkv::Writer) -> Result where D: serde::Serialize { - push_documents_addition(writer, self.updates_store, self.documents) + let update_id = push_documents_addition( + &mut writer, + self.updates_store, + self.updates_results_store, + self.documents, + )?; + writer.commit()?; + let _ = self.updates_notifier.send(()); + + Ok(update_id) + } +} + +impl Extend for DocumentsAddition { + fn extend>(&mut self, iter: T) { + self.documents.extend(iter) } } diff --git a/meilidb-core/src/update/documents_deletion.rs b/meilidb-core/src/update/documents_deletion.rs index 8528eef06..21cc3465e 100644 --- a/meilidb-core/src/update/documents_deletion.rs +++ b/meilidb-core/src/update/documents_deletion.rs @@ -11,12 +11,24 @@ use crate::store; pub struct DocumentsDeletion { updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, documents: Vec, } impl DocumentsDeletion { - pub fn new(updates_store: store::Updates) -> DocumentsDeletion { - DocumentsDeletion { updates_store, documents: Vec::new() } + pub fn new( + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + updates_notifier: crossbeam_channel::Sender<()>, + ) -> DocumentsDeletion + { + DocumentsDeletion { + updates_store, + updates_results_store, + updates_notifier, + documents: Vec::new(), + } } pub fn delete_document_by_id(&mut self, document_id: DocumentId) { @@ -37,8 +49,17 @@ impl DocumentsDeletion { Ok(()) } - pub fn finalize(self, writer: &mut rkv::Writer) -> Result { - push_documents_deletion(writer, self.updates_store, self.documents) + pub fn finalize(self, mut writer: rkv::Writer) -> Result { + let update_id = push_documents_deletion( + &mut writer, + self.updates_store, + self.updates_results_store, + self.documents, + )?; + writer.commit()?; + let _ = self.updates_notifier.send(()); + + Ok(update_id) } } diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 9bd929f0d..1e9c9c00d 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -5,16 +5,17 @@ pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; use std::time::{Duration, Instant}; +use log::debug; use serde::{Serialize, Deserialize}; use crate::{store, Error, MResult, DocumentId, RankedMap}; -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum Update { DocumentsAddition(Vec), DocumentsDeletion(Vec), } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum UpdateType { DocumentsAddition { number: usize }, DocumentsDeletion { number: usize }, @@ -59,11 +60,29 @@ pub fn update_status( } } +pub fn biggest_update_id( + writer: &mut rkv::Writer, + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, +) -> MResult> +{ + let last_update_id = updates_store.last_update_id(writer)?; + let last_update_id = last_update_id.map(|(n, _)| n); + + let last_update_results_id = updates_results_store.last_update_id(writer)?; + let last_update_results_id = last_update_results_id.map(|(n, _)| n); + + let max = last_update_id.max(last_update_results_id); + + Ok(max) +} + pub fn push_documents_addition( writer: &mut rkv::Writer, updates_store: store::Updates, + updates_results_store: store::UpdatesResults, addition: Vec, -) -> Result +) -> MResult { let mut values = Vec::with_capacity(addition.len()); for add in addition { @@ -72,22 +91,29 @@ pub fn push_documents_addition( values.push(add); } - let update = Update::DocumentsAddition(values); - let update_id = updates_store.push_back(writer, &update)?; + let last_update_id = biggest_update_id(writer, updates_store, updates_results_store)?; + let last_update_id = last_update_id.map_or(0, |n| n + 1); - Ok(update_id) + let update = Update::DocumentsAddition(values); + let update_id = updates_store.put_update(writer, last_update_id, &update)?; + + Ok(last_update_id) } pub fn push_documents_deletion( writer: &mut rkv::Writer, updates_store: store::Updates, + updates_results_store: store::UpdatesResults, deletion: Vec, -) -> Result +) -> MResult { - let update = Update::DocumentsDeletion(deletion); - let update_id = updates_store.push_back(writer, &update)?; + let last_update_id = biggest_update_id(writer, updates_store, updates_results_store)?; + let last_update_id = last_update_id.map_or(0, |n| n + 1); - Ok(update_id) + let update = Update::DocumentsDeletion(deletion); + let update_id = updates_store.put_update(writer, last_update_id, &update)?; + + Ok(last_update_id) } pub fn update_task( @@ -101,56 +127,58 @@ pub fn update_task( None => return Ok(false), }; + debug!("Processing update number {}", update_id); + let (update_type, result, duration) = match update { Update::DocumentsAddition(documents) => { - let update_type = UpdateType::DocumentsAddition { number: documents.len() }; + let start = Instant::now(); - let schema = match index.main.schema(writer)? { - Some(schema) => schema, - None => return Err(Error::SchemaMissing), - }; let ranked_map = match index.main.ranked_map(writer)? { Some(ranked_map) => ranked_map, None => RankedMap::default(), }; - let start = Instant::now(); - let result = apply_documents_addition( - writer, - index.main, - index.documents_fields, - index.postings_lists, - index.docs_words, - &schema, - ranked_map, - documents, - ); + let update_type = UpdateType::DocumentsAddition { number: documents.len() }; + + let result = match index.main.schema(writer)? { + Some(schema) => apply_documents_addition( + writer, + index.main, + index.documents_fields, + index.postings_lists, + index.docs_words, + &schema, + ranked_map, + documents, + ), + None => Err(Error::SchemaMissing), + }; (update_type, result, start.elapsed()) }, Update::DocumentsDeletion(documents) => { - let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; + let start = Instant::now(); - let schema = match index.main.schema(writer)? { - Some(schema) => schema, - None => return Err(Error::SchemaMissing), - }; let ranked_map = match index.main.ranked_map(writer)? { Some(ranked_map) => ranked_map, None => RankedMap::default(), }; - let start = Instant::now(); - let result = apply_documents_deletion( - writer, - index.main, - index.documents_fields, - index.postings_lists, - index.docs_words, - &schema, - ranked_map, - documents, - ); + let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; + + let result = match index.main.schema(writer)? { + Some(schema) => apply_documents_deletion( + writer, + index.main, + index.documents_fields, + index.postings_lists, + index.docs_words, + &schema, + ranked_map, + documents, + ), + None => Err(Error::SchemaMissing), + }; (update_type, result, start.elapsed()) },