From a57a64823e977094a43c5463b689b55e29c60536 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 7 Oct 2019 17:48:26 +0200 Subject: [PATCH] Make possible to create an index and add a schema later on --- meilidb-core/src/error.rs | 21 +++++ meilidb-core/src/main.rs | 26 +++++- meilidb-core/src/store/mod.rs | 10 ++- meilidb-core/src/update/documents_addition.rs | 15 ++-- meilidb-core/src/update/documents_deletion.rs | 14 ++-- meilidb-core/src/update/mod.rs | 79 ++++++++++++------- meilidb-core/src/update/schema_update.rs | 15 ++++ 7 files changed, 137 insertions(+), 43 deletions(-) create mode 100644 meilidb-core/src/update/schema_update.rs diff --git a/meilidb-core/src/error.rs b/meilidb-core/src/error.rs index ae3ffd884..da858b917 100644 --- a/meilidb-core/src/error.rs +++ b/meilidb-core/src/error.rs @@ -16,6 +16,7 @@ pub enum Error { RmpEncode(rmp_serde::encode::Error), Bincode(bincode::Error), Serializer(SerializerError), + UnsupportedOperation(UnsupportedOperation), } impl From for Error { @@ -60,6 +61,12 @@ impl From for Error { } } +impl From for Error { + fn from(op: UnsupportedOperation) -> Error { + Error::UnsupportedOperation(op) + } +} + impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { use self::Error::*; @@ -75,9 +82,23 @@ impl fmt::Display for Error { RmpEncode(e) => write!(f, "rmp encode error; {}", e), Bincode(e) => write!(f, "bincode error; {}", e), Serializer(e) => write!(f, "serializer error; {}", e), + UnsupportedOperation(op) => write!(f, "unsupported operation; {}", op), } } } impl error::Error for Error { } +#[derive(Debug)] +pub enum UnsupportedOperation { + SchemaAlreadyExists, +} + +impl fmt::Display for UnsupportedOperation { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use self::UnsupportedOperation::*; + match self { + SchemaAlreadyExists => write!(f, "Cannot update index which already have a schema"), + } + } +} diff --git a/meilidb-core/src/main.rs b/meilidb-core/src/main.rs index 247edbff4..90ff2723a 100644 --- a/meilidb-core/src/main.rs +++ b/meilidb-core/src/main.rs @@ -1,6 +1,10 @@ -use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions}; use std::{fs, path::Path}; + +use serde_json::json; +use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions}; + use meilidb_core::{Database, MResult, QueryBuilder}; +use meilidb_schema::{SchemaBuilder, DISPLAYED, INDEXED}; fn main() -> MResult<()> { env_logger::init(); @@ -13,8 +17,24 @@ fn main() -> MResult<()> { let hello1 = database.open_index("hello1")?; let hello2 = database.open_index("hello2")?; + let mut builder = SchemaBuilder::with_identifier("id"); + builder.new_attribute("alpha", DISPLAYED); + builder.new_attribute("beta", DISPLAYED | INDEXED); + builder.new_attribute("gamma", INDEXED); + let schema = builder.build(); + + let rkv = database.rkv.read().unwrap(); + let writer = rkv.write()?; + + hello.schema_update(writer, schema)?; + + let object = json!({ + "id": 23, + "alpha": "hello", + }); + let mut additions = hello.documents_addition(); - additions.extend(vec![()]); + additions.extend(vec![object]); let rkv = database.rkv.read().unwrap(); let writer = rkv.write()?; @@ -53,7 +73,7 @@ fn main() -> MResult<()> { // println!("{:?}", documents); - std::thread::sleep(std::time::Duration::from_secs(10)); + std::thread::sleep(std::time::Duration::from_secs(2)); Ok(()) } diff --git a/meilidb-core/src/store/mod.rs b/meilidb-core/src/store/mod.rs index c57b82131..8b596edd9 100644 --- a/meilidb-core/src/store/mod.rs +++ b/meilidb-core/src/store/mod.rs @@ -14,7 +14,8 @@ pub use self::synonyms::Synonyms; pub use self::updates::Updates; pub use self::updates_results::UpdatesResults; -use crate::update; +use meilidb_schema::Schema; +use crate::{update, MResult}; fn aligned_to(bytes: &[u8], align: usize) -> bool { (bytes as *const _ as *const () as usize) % align == 0 @@ -62,6 +63,13 @@ pub struct Index { } impl Index { + pub fn schema_update(&self, mut writer: rkv::Writer, schema: Schema) -> MResult<()> { + update::push_schema_update(&mut writer, self.updates, self.updates_results, schema)?; + writer.commit()?; + let _ = self.updates_notifier.send(()); + Ok(()) + } + pub fn documents_addition(&self) -> update::DocumentsAddition { update::DocumentsAddition::new( self.updates, diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index 16de42138..c45750a59 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -9,7 +9,7 @@ use crate::raw_indexer::RawIndexer; use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; use crate::store; use crate::update::{push_documents_addition, apply_documents_deletion}; -use crate::{Error, RankedMap}; +use crate::{MResult, Error, RankedMap}; pub struct DocumentsAddition { updates_store: store::Updates, @@ -37,7 +37,7 @@ impl DocumentsAddition { self.documents.push(document); } - pub fn finalize(self, mut writer: rkv::Writer) -> Result + pub fn finalize(self, mut writer: rkv::Writer) -> MResult where D: serde::Serialize { let update_id = push_documents_addition( @@ -65,15 +65,19 @@ pub fn apply_documents_addition( documents_fields_store: store::DocumentsFields, postings_lists_store: store::PostingsLists, docs_words_store: store::DocsWords, - schema: &Schema, mut ranked_map: RankedMap, addition: Vec, -) -> Result<(), Error> +) -> MResult<()> { let mut document_ids = HashSet::new(); let mut document_store = RamDocumentStore::new(); let mut indexer = RawIndexer::new(); + let schema = match main_store.schema(writer)? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + let identifier = schema.identifier_name(); for document in addition { @@ -87,7 +91,7 @@ pub fn apply_documents_addition( // 2. index the document fields in ram stores let serializer = Serializer { - schema, + schema: &schema, document_store: &mut document_store, indexer: &mut indexer, ranked_map: &mut ranked_map, @@ -105,7 +109,6 @@ pub fn apply_documents_addition( documents_fields_store, postings_lists_store, docs_words_store, - schema, ranked_map.clone(), documents_to_insert, )?; diff --git a/meilidb-core/src/update/documents_deletion.rs b/meilidb-core/src/update/documents_deletion.rs index 21cc3465e..896365978 100644 --- a/meilidb-core/src/update/documents_deletion.rs +++ b/meilidb-core/src/update/documents_deletion.rs @@ -4,7 +4,7 @@ use fst::{SetBuilder, Streamer}; use meilidb_schema::Schema; use sdset::{SetBuf, SetOperation, duo::DifferenceByKey}; -use crate::{DocumentId, RankedMap, Error}; +use crate::{DocumentId, RankedMap, MResult, Error}; use crate::serde::extract_document_id; use crate::update::push_documents_deletion; use crate::store; @@ -35,7 +35,7 @@ impl DocumentsDeletion { self.documents.push(document_id); } - pub fn delete_document(&mut self, schema: &Schema, document: D) -> Result<(), Error> + pub fn delete_document(&mut self, schema: &Schema, document: D) -> MResult<()> where D: serde::Serialize, { let identifier = schema.identifier_name(); @@ -49,7 +49,7 @@ impl DocumentsDeletion { Ok(()) } - pub fn finalize(self, mut writer: rkv::Writer) -> Result { + pub fn finalize(self, mut writer: rkv::Writer) -> MResult { let update_id = push_documents_deletion( &mut writer, self.updates_store, @@ -75,13 +75,17 @@ pub fn apply_documents_deletion( documents_fields_store: store::DocumentsFields, postings_lists_store: store::PostingsLists, docs_words_store: store::DocsWords, - schema: &Schema, mut ranked_map: RankedMap, deletion: Vec, -) -> Result<(), Error> +) -> MResult<()> { let idset = SetBuf::from_dirty(deletion); + let schema = match main_store.schema(writer)? { + Some(schema) => schema, + None => return Err(Error::SchemaMissing), + }; + // collect the ranked attributes according to the schema let ranked_attrs: Vec<_> = schema.iter() .filter_map(|(_, attr, prop)| { diff --git a/meilidb-core/src/update/mod.rs b/meilidb-core/src/update/mod.rs index 1e9c9c00d..056f7a003 100644 --- a/meilidb-core/src/update/mod.rs +++ b/meilidb-core/src/update/mod.rs @@ -1,22 +1,30 @@ mod documents_addition; mod documents_deletion; +mod schema_update; pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; +pub use self::schema_update::apply_schema_update; use std::time::{Duration, Instant}; + use log::debug; use serde::{Serialize, Deserialize}; + use crate::{store, Error, MResult, DocumentId, RankedMap}; +use crate::error::UnsupportedOperation; +use meilidb_schema::Schema; #[derive(Debug, Serialize, Deserialize)] pub enum Update { + SchemaUpdate(Schema), DocumentsAddition(Vec), DocumentsDeletion(Vec), } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UpdateType { + SchemaUpdate { schema: Schema }, DocumentsAddition { number: usize }, DocumentsDeletion { number: usize }, } @@ -77,6 +85,22 @@ pub fn biggest_update_id( Ok(max) } +pub fn push_schema_update( + writer: &mut rkv::Writer, + updates_store: store::Updates, + updates_results_store: store::UpdatesResults, + schema: Schema, +) -> MResult +{ + let last_update_id = biggest_update_id(writer, updates_store, updates_results_store)?; + let last_update_id = last_update_id.map_or(0, |n| n + 1); + + let update = Update::SchemaUpdate(schema); + let update_id = updates_store.put_update(writer, last_update_id, &update)?; + + Ok(last_update_id) +} + pub fn push_documents_addition( writer: &mut rkv::Writer, updates_store: store::Updates, @@ -127,9 +151,14 @@ pub fn update_task( None => return Ok(false), }; - debug!("Processing update number {}", update_id); - let (update_type, result, duration) = match update { + Update::SchemaUpdate(schema) => { + let start = Instant::now(); + let update_type = UpdateType::SchemaUpdate { schema: schema.clone() }; + let result = apply_schema_update(writer, index.main, &schema); + + (update_type, result, start.elapsed()) + }, Update::DocumentsAddition(documents) => { let start = Instant::now(); @@ -140,19 +169,15 @@ pub fn update_task( let update_type = UpdateType::DocumentsAddition { number: documents.len() }; - let result = match index.main.schema(writer)? { - Some(schema) => apply_documents_addition( - writer, - index.main, - index.documents_fields, - index.postings_lists, - index.docs_words, - &schema, - ranked_map, - documents, - ), - None => Err(Error::SchemaMissing), - }; + let result = apply_documents_addition( + writer, + index.main, + index.documents_fields, + index.postings_lists, + index.docs_words, + ranked_map, + documents, + ); (update_type, result, start.elapsed()) }, @@ -166,24 +191,22 @@ pub fn update_task( let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; - let result = match index.main.schema(writer)? { - Some(schema) => apply_documents_deletion( - writer, - index.main, - index.documents_fields, - index.postings_lists, - index.docs_words, - &schema, - ranked_map, - documents, - ), - None => Err(Error::SchemaMissing), - }; + let result = apply_documents_deletion( + writer, + index.main, + index.documents_fields, + index.postings_lists, + index.docs_words, + ranked_map, + documents, + ); (update_type, result, start.elapsed()) }, }; + debug!("Processed update number {} {:?} {:?}", update_id, update_type, result); + let detailed_duration = DetailedDuration { main: duration }; let status = UpdateResult { update_id, diff --git a/meilidb-core/src/update/schema_update.rs b/meilidb-core/src/update/schema_update.rs new file mode 100644 index 000000000..2ebec3b39 --- /dev/null +++ b/meilidb-core/src/update/schema_update.rs @@ -0,0 +1,15 @@ +use meilidb_schema::Schema; +use crate::{store, error::UnsupportedOperation, MResult}; + +pub fn apply_schema_update( + writer: &mut rkv::Writer, + main_store: store::Main, + new_schema: &Schema, +) -> MResult<()> +{ + if let Some(_) = main_store.schema(writer)? { + return Err(UnsupportedOperation::SchemaAlreadyExists.into()) + } + + main_store.put_schema(writer, new_schema) +}