From e48630da72ab947a617c131e873bbbfdf877ccef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 3 Nov 2020 13:20:11 +0100 Subject: [PATCH] Introduce the searchable parameter settings to the Settings update --- src/subcommand/serve.rs | 12 +- src/update/index_documents/mod.rs | 29 +++-- src/update/index_documents/transform.rs | 53 ++++++++ src/update/settings.rs | 156 ++++++++++++++++++++++-- src/update/update_builder.rs | 15 ++- 5 files changed, 243 insertions(+), 22 deletions(-) diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 1b2ee48de..57cc2e012 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -308,7 +308,17 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { } } - match builder.execute() { + let result = builder.execute(|count, total| { + let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { + update_id, + meta: UpdateMetaProgress::DocumentsAddition { + processed_number_of_documents: count, + total_number_of_documents: Some(total), + } + }); + }); + + match result { Ok(_count) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()) } diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 1606ecd32..7dd3b6611 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -9,8 +9,10 @@ use bstr::ByteSlice as _; use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use heed::types::ByteSlice; use log::{debug, info, error}; +use memmap::Mmap; use rayon::prelude::*; use rayon::ThreadPool; + use crate::index::Index; use self::store::Store; use self::merge_function::{ @@ -248,7 +250,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { R: io::Read, F: Fn(usize, usize) + Sync, { - let before_indexing = Instant::now(); + let before_transform = Instant::now(); let transform = Transform { rtxn: &self.wtxn, @@ -268,6 +270,17 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { UpdateFormat::JsonStream => transform.from_json_stream(reader)?, }; + info!("Update transformed in {:.02?}", before_transform.elapsed()); + + self.execute_raw(output, progress_callback) + } + + pub fn execute_raw(self, output: TransformOutput, progress_callback: F) -> anyhow::Result<()> + where + F: Fn(usize, usize) + Sync + { + let before_indexing = Instant::now(); + let TransformOutput { primary_key, fields_ids_map, @@ -296,16 +309,14 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { let _deleted_documents_count = deletion_builder.execute()?; } - let mmap = if documents_count == 0 { - None + let mmap; + let bytes = if documents_count == 0 { + &[][..] } else { - let mmap = unsafe { - memmap::Mmap::map(&documents_file).context("mmaping the transform documents file")? - }; - Some(mmap) + mmap = unsafe { Mmap::map(&documents_file).context("mmaping the transform documents file")? }; + &mmap }; - let bytes = mmap.as_ref().map(AsRef::as_ref).unwrap_or_default(); let documents = grenad::Reader::new(bytes).unwrap(); // The enum which indicates the type of the readers @@ -492,7 +503,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { } } - info!("Update processed in {:.02?}", before_indexing.elapsed()); + info!("Transform output indexed in {:.02?}", before_indexing.elapsed()); Ok(()) } diff --git a/src/update/index_documents/transform.rs b/src/update/index_documents/transform.rs index ff71928cb..cb583849a 100644 --- a/src/update/index_documents/transform.rs +++ b/src/update/index_documents/transform.rs @@ -404,6 +404,59 @@ impl Transform<'_, '_> { documents_file, }) } + + /// Returns a `TransformOutput` with a file that contains the documents of the index + /// with the attributes reordered accordingly to the `FieldsIdsMap` given as argument. + // TODO this can be done in parallel by using the rayon `ThreadPool`. + pub fn remap_index_documents( + self, + primary_key: u8, + fields_ids_map: FieldsIdsMap, + ) -> anyhow::Result + { + let current_fields_ids_map = self.index.fields_ids_map(self.rtxn)?; + let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn)?; + let documents_ids = self.index.documents_ids(self.rtxn)?; + let documents_count = documents_ids.len() as usize; + + // We create a final writer to write the new documents in order from the sorter. + let file = tempfile::tempfile()?; + let mut writer = create_writer(self.chunk_compression_type, self.chunk_compression_level, file)?; + + let mut obkv_buffer = Vec::new(); + for result in self.index.documents.iter(self.rtxn)? { + let (docid, obkv) = result?; + let docid = docid.get(); + + obkv_buffer.clear(); + let mut obkv_writer = obkv::KvWriter::new(&mut obkv_buffer); + + // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. + for (id, name) in fields_ids_map.iter() { + if let Some(val) = current_fields_ids_map.id(name).and_then(|id| obkv.get(id)) { + obkv_writer.insert(id, val)?; + } + } + + let buffer = obkv_writer.into_inner()?; + writer.insert(docid.to_be_bytes(), buffer)?; + } + + // Once we have written all the documents, we extract + // the file and reset the seek to be able to read it again. + let mut documents_file = writer.into_inner()?; + documents_file.seek(SeekFrom::Start(0))?; + + Ok(TransformOutput { + primary_key, + fields_ids_map, + users_ids_documents_ids: users_ids_documents_ids.map_data(Cow::into_owned)?, + new_documents_ids: documents_ids, + replaced_documents_ids: RoaringBitmap::default(), + documents_count, + documents_file, + }) + } } /// Only the last value associated with an id is kept. diff --git a/src/update/settings.rs b/src/update/settings.rs index 0bfc7ff6d..b9abeb477 100644 --- a/src/update/settings.rs +++ b/src/update/settings.rs @@ -1,17 +1,45 @@ use anyhow::Context; -use crate::Index; +use grenad::CompressionType; +use rayon::ThreadPool; -pub struct Settings<'t, 'u, 'i> { +use crate::update::index_documents::{Transform, IndexDocumentsMethod}; +use crate::update::{ClearDocuments, IndexDocuments}; +use crate::{Index, FieldsIdsMap}; + +pub struct Settings<'a, 't, 'u, 'i> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, - // If the field is set to `None` it means that it hasn't been set by the user, + pub(crate) log_every_n: Option, + pub(crate) max_nb_chunks: Option, + pub(crate) max_memory: Option, + pub(crate) linked_hash_map_size: Option, + pub(crate) chunk_compression_type: CompressionType, + pub(crate) chunk_compression_level: Option, + pub(crate) chunk_fusing_shrink_size: Option, + pub(crate) thread_pool: Option<&'a ThreadPool>, + + // If a struct field is set to `None` it means that it hasn't been set by the user, // however if it is `Some(None)` it means that the user forced a reset of the setting. + searchable_fields: Option>>, displayed_fields: Option>>, } -impl<'t, 'u, 'i> Settings<'t, 'u, 'i> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'t, 'u, 'i> { - Settings { wtxn, index, displayed_fields: None } +impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { + pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'a, 't, 'u, 'i> { + Settings { + wtxn, + index, + log_every_n: None, + max_nb_chunks: None, + max_memory: None, + linked_hash_map_size: None, + chunk_compression_type: CompressionType::None, + chunk_compression_level: None, + chunk_fusing_shrink_size: None, + thread_pool: None, + searchable_fields: None, + displayed_fields: None, + } } pub fn reset_displayed_fields(&mut self) { @@ -22,8 +50,116 @@ impl<'t, 'u, 'i> Settings<'t, 'u, 'i> { self.displayed_fields = Some(Some(names)); } - pub fn execute(self) -> anyhow::Result<()> { - // Check that the displayed attributes parameters has been specified. + pub fn execute(self, progress_callback: F) -> anyhow::Result<()> + where + F: Fn(usize, usize) + Sync + { + // Check that the searchable attributes have been specified. + if let Some(value) = self.searchable_fields { + let current_searchable_fields = self.index.searchable_fields(self.wtxn)?; + let current_displayed_fields = self.index.displayed_fields(self.wtxn)?; + let current_fields_ids_map = self.index.fields_ids_map(self.wtxn)?; + + let result = match value { + Some(fields_names) => { + // We create or generate the fields ids corresponding to those names. + let mut fields_ids_map = FieldsIdsMap::new(); + let mut searchable_fields = Vec::new(); + for name in fields_names { + let id = fields_ids_map.insert(&name).context("field id limit reached")?; + searchable_fields.push(id); + } + + // We complete the new FieldsIdsMap with the previous names. + for (_id, name) in current_fields_ids_map.iter() { + fields_ids_map.insert(name).context("field id limit reached")?; + } + + // We must also update the displayed fields according to the new `FieldsIdsMap`. + let displayed_fields = match current_displayed_fields { + Some(fields) => { + let mut displayed_fields = Vec::new(); + for id in fields { + let name = current_fields_ids_map.name(*id).unwrap(); + let id = fields_ids_map.id(name).context("field id limit reached")?; + displayed_fields.push(id); + } + Some(displayed_fields) + }, + None => None, + }; + + (fields_ids_map, Some(searchable_fields), displayed_fields) + }, + None => ( + current_fields_ids_map.clone(), + current_searchable_fields.map(ToOwned::to_owned), + current_displayed_fields.map(ToOwned::to_owned), + ), + }; + + let (mut fields_ids_map, searchable_fields, displayed_fields) = result; + + let transform = Transform { + rtxn: &self.wtxn, + index: self.index, + chunk_compression_type: self.chunk_compression_type, + chunk_compression_level: self.chunk_compression_level, + chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, + max_nb_chunks: self.max_nb_chunks, + max_memory: self.max_memory, + index_documents_method: IndexDocumentsMethod::ReplaceDocuments, + autogenerate_docids: false, + }; + + // We compute or generate the new primary key field id. + let primary_key = match self.index.primary_key(&self.wtxn)? { + Some(id) => { + let name = current_fields_ids_map.name(id).unwrap(); + fields_ids_map.insert(name).context("field id limit reached")? + }, + None => fields_ids_map.insert("id").context("field id limit reached")?, + }; + + // We remap the documents fields based on the new `FieldsIdsMap`. + let output = transform.remap_index_documents(primary_key, fields_ids_map.clone())?; + + // We write the new FieldsIdsMap to the database + // this way next indexing methods will be based on that. + self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; + + // The new searchable fields are also written down to make sure + // that the IndexDocuments system takes only these ones into account. + match searchable_fields { + Some(fields) => self.index.put_searchable_fields(self.wtxn, &fields)?, + None => self.index.delete_searchable_fields(self.wtxn).map(drop)?, + } + + // We write the displayed fields into the database here + // to make sure that the right fields are displayed. + match displayed_fields { + Some(fields) => self.index.put_displayed_fields(self.wtxn, &fields)?, + None => self.index.delete_displayed_fields(self.wtxn).map(drop)?, + } + + // We clear the full database (words-fst, documents ids and documents content). + ClearDocuments::new(self.wtxn, self.index).execute()?; + + // We index the generated `TransformOutput` which must contain + // all the documents with fields in the newly defined searchable order. + let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index); + indexing_builder.log_every_n = self.log_every_n; + indexing_builder.max_nb_chunks = self.max_nb_chunks; + indexing_builder.max_memory = self.max_memory; + indexing_builder.linked_hash_map_size = self.linked_hash_map_size; + indexing_builder.chunk_compression_type = self.chunk_compression_type; + indexing_builder.chunk_compression_level = self.chunk_compression_level; + indexing_builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + indexing_builder.thread_pool = self.thread_pool; + indexing_builder.execute_raw(output, progress_callback)?; + } + + // Check that the displayed attributes have been specified. if let Some(value) = self.displayed_fields { match value { // If it has been set, and it was a list of fields names, we create @@ -99,7 +235,7 @@ mod tests { // In the same transaction we change the displayed fields to be only the age. let mut builder = Settings::new(&mut wtxn, &index); builder.set_displayed_fields(vec!["age".into()]); - builder.execute().unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to only the "age" field. @@ -114,7 +250,7 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); let mut builder = Settings::new(&mut wtxn, &index); builder.reset_displayed_fields(); - builder.execute().unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to `None` (default value). diff --git a/src/update/update_builder.rs b/src/update/update_builder.rs index a4a70be91..67ea04bfc 100644 --- a/src/update/update_builder.rs +++ b/src/update/update_builder.rs @@ -103,8 +103,19 @@ impl<'a> UpdateBuilder<'a> { self, wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, - ) -> Settings<'t, 'u, 'i> + ) -> Settings<'a, 't, 'u, 'i> { - Settings::new(wtxn, index) + let mut builder = Settings::new(wtxn, index); + + builder.log_every_n = self.log_every_n; + builder.max_nb_chunks = self.max_nb_chunks; + builder.max_memory = self.max_memory; + builder.linked_hash_map_size = self.linked_hash_map_size; + builder.chunk_compression_type = self.chunk_compression_type; + builder.chunk_compression_level = self.chunk_compression_level; + builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + builder.thread_pool = self.thread_pool; + + builder } }