Make possible to create an index and add a schema later on

This commit is contained in:
Clément Renault 2019-10-07 17:48:26 +02:00
parent aa05459e4f
commit a57a64823e
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
7 changed files with 137 additions and 43 deletions

View File

@ -16,6 +16,7 @@ pub enum Error {
RmpEncode(rmp_serde::encode::Error), RmpEncode(rmp_serde::encode::Error),
Bincode(bincode::Error), Bincode(bincode::Error),
Serializer(SerializerError), Serializer(SerializerError),
UnsupportedOperation(UnsupportedOperation),
} }
impl From<io::Error> for Error { impl From<io::Error> for Error {
@ -60,6 +61,12 @@ impl From<SerializerError> for Error {
} }
} }
impl From<UnsupportedOperation> for Error {
fn from(op: UnsupportedOperation) -> Error {
Error::UnsupportedOperation(op)
}
}
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::Error::*; use self::Error::*;
@ -75,9 +82,23 @@ impl fmt::Display for Error {
RmpEncode(e) => write!(f, "rmp encode error; {}", e), RmpEncode(e) => write!(f, "rmp encode error; {}", e),
Bincode(e) => write!(f, "bincode error; {}", e), Bincode(e) => write!(f, "bincode error; {}", e),
Serializer(e) => write!(f, "serializer error; {}", e), Serializer(e) => write!(f, "serializer error; {}", e),
UnsupportedOperation(op) => write!(f, "unsupported operation; {}", op),
} }
} }
} }
impl error::Error for Error { } impl error::Error for Error { }
#[derive(Debug)]
pub enum UnsupportedOperation {
SchemaAlreadyExists,
}
impl fmt::Display for UnsupportedOperation {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::UnsupportedOperation::*;
match self {
SchemaAlreadyExists => write!(f, "Cannot update index which already have a schema"),
}
}
}

View File

@ -1,6 +1,10 @@
use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions};
use std::{fs, path::Path}; use std::{fs, path::Path};
use serde_json::json;
use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions};
use meilidb_core::{Database, MResult, QueryBuilder}; use meilidb_core::{Database, MResult, QueryBuilder};
use meilidb_schema::{SchemaBuilder, DISPLAYED, INDEXED};
fn main() -> MResult<()> { fn main() -> MResult<()> {
env_logger::init(); env_logger::init();
@ -13,8 +17,24 @@ fn main() -> MResult<()> {
let hello1 = database.open_index("hello1")?; let hello1 = database.open_index("hello1")?;
let hello2 = database.open_index("hello2")?; let hello2 = database.open_index("hello2")?;
let mut builder = SchemaBuilder::with_identifier("id");
builder.new_attribute("alpha", DISPLAYED);
builder.new_attribute("beta", DISPLAYED | INDEXED);
builder.new_attribute("gamma", INDEXED);
let schema = builder.build();
let rkv = database.rkv.read().unwrap();
let writer = rkv.write()?;
hello.schema_update(writer, schema)?;
let object = json!({
"id": 23,
"alpha": "hello",
});
let mut additions = hello.documents_addition(); let mut additions = hello.documents_addition();
additions.extend(vec![()]); additions.extend(vec![object]);
let rkv = database.rkv.read().unwrap(); let rkv = database.rkv.read().unwrap();
let writer = rkv.write()?; let writer = rkv.write()?;
@ -53,7 +73,7 @@ fn main() -> MResult<()> {
// println!("{:?}", documents); // println!("{:?}", documents);
std::thread::sleep(std::time::Duration::from_secs(10)); std::thread::sleep(std::time::Duration::from_secs(2));
Ok(()) Ok(())
} }

View File

@ -14,7 +14,8 @@ pub use self::synonyms::Synonyms;
pub use self::updates::Updates; pub use self::updates::Updates;
pub use self::updates_results::UpdatesResults; pub use self::updates_results::UpdatesResults;
use crate::update; use meilidb_schema::Schema;
use crate::{update, MResult};
fn aligned_to(bytes: &[u8], align: usize) -> bool { fn aligned_to(bytes: &[u8], align: usize) -> bool {
(bytes as *const _ as *const () as usize) % align == 0 (bytes as *const _ as *const () as usize) % align == 0
@ -62,6 +63,13 @@ pub struct Index {
} }
impl Index { impl Index {
pub fn schema_update(&self, mut writer: rkv::Writer, schema: Schema) -> MResult<()> {
update::push_schema_update(&mut writer, self.updates, self.updates_results, schema)?;
writer.commit()?;
let _ = self.updates_notifier.send(());
Ok(())
}
pub fn documents_addition<D>(&self) -> update::DocumentsAddition<D> { pub fn documents_addition<D>(&self) -> update::DocumentsAddition<D> {
update::DocumentsAddition::new( update::DocumentsAddition::new(
self.updates, self.updates,

View File

@ -9,7 +9,7 @@ use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
use crate::store; use crate::store;
use crate::update::{push_documents_addition, apply_documents_deletion}; use crate::update::{push_documents_addition, apply_documents_deletion};
use crate::{Error, RankedMap}; use crate::{MResult, Error, RankedMap};
pub struct DocumentsAddition<D> { pub struct DocumentsAddition<D> {
updates_store: store::Updates, updates_store: store::Updates,
@ -37,7 +37,7 @@ impl<D> DocumentsAddition<D> {
self.documents.push(document); self.documents.push(document);
} }
pub fn finalize(self, mut writer: rkv::Writer) -> Result<u64, Error> pub fn finalize(self, mut writer: rkv::Writer) -> MResult<u64>
where D: serde::Serialize where D: serde::Serialize
{ {
let update_id = push_documents_addition( let update_id = push_documents_addition(
@ -65,15 +65,19 @@ pub fn apply_documents_addition(
documents_fields_store: store::DocumentsFields, documents_fields_store: store::DocumentsFields,
postings_lists_store: store::PostingsLists, postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords, docs_words_store: store::DocsWords,
schema: &Schema,
mut ranked_map: RankedMap, mut ranked_map: RankedMap,
addition: Vec<rmpv::Value>, addition: Vec<rmpv::Value>,
) -> Result<(), Error> ) -> MResult<()>
{ {
let mut document_ids = HashSet::new(); let mut document_ids = HashSet::new();
let mut document_store = RamDocumentStore::new(); let mut document_store = RamDocumentStore::new();
let mut indexer = RawIndexer::new(); let mut indexer = RawIndexer::new();
let schema = match main_store.schema(writer)? {
Some(schema) => schema,
None => return Err(Error::SchemaMissing),
};
let identifier = schema.identifier_name(); let identifier = schema.identifier_name();
for document in addition { for document in addition {
@ -87,7 +91,7 @@ pub fn apply_documents_addition(
// 2. index the document fields in ram stores // 2. index the document fields in ram stores
let serializer = Serializer { let serializer = Serializer {
schema, schema: &schema,
document_store: &mut document_store, document_store: &mut document_store,
indexer: &mut indexer, indexer: &mut indexer,
ranked_map: &mut ranked_map, ranked_map: &mut ranked_map,
@ -105,7 +109,6 @@ pub fn apply_documents_addition(
documents_fields_store, documents_fields_store,
postings_lists_store, postings_lists_store,
docs_words_store, docs_words_store,
schema,
ranked_map.clone(), ranked_map.clone(),
documents_to_insert, documents_to_insert,
)?; )?;

View File

@ -4,7 +4,7 @@ use fst::{SetBuilder, Streamer};
use meilidb_schema::Schema; use meilidb_schema::Schema;
use sdset::{SetBuf, SetOperation, duo::DifferenceByKey}; use sdset::{SetBuf, SetOperation, duo::DifferenceByKey};
use crate::{DocumentId, RankedMap, Error}; use crate::{DocumentId, RankedMap, MResult, Error};
use crate::serde::extract_document_id; use crate::serde::extract_document_id;
use crate::update::push_documents_deletion; use crate::update::push_documents_deletion;
use crate::store; use crate::store;
@ -35,7 +35,7 @@ impl DocumentsDeletion {
self.documents.push(document_id); self.documents.push(document_id);
} }
pub fn delete_document<D>(&mut self, schema: &Schema, document: D) -> Result<(), Error> pub fn delete_document<D>(&mut self, schema: &Schema, document: D) -> MResult<()>
where D: serde::Serialize, where D: serde::Serialize,
{ {
let identifier = schema.identifier_name(); let identifier = schema.identifier_name();
@ -49,7 +49,7 @@ impl DocumentsDeletion {
Ok(()) Ok(())
} }
pub fn finalize(self, mut writer: rkv::Writer) -> Result<u64, Error> { pub fn finalize(self, mut writer: rkv::Writer) -> MResult<u64> {
let update_id = push_documents_deletion( let update_id = push_documents_deletion(
&mut writer, &mut writer,
self.updates_store, self.updates_store,
@ -75,13 +75,17 @@ pub fn apply_documents_deletion(
documents_fields_store: store::DocumentsFields, documents_fields_store: store::DocumentsFields,
postings_lists_store: store::PostingsLists, postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords, docs_words_store: store::DocsWords,
schema: &Schema,
mut ranked_map: RankedMap, mut ranked_map: RankedMap,
deletion: Vec<DocumentId>, deletion: Vec<DocumentId>,
) -> Result<(), Error> ) -> MResult<()>
{ {
let idset = SetBuf::from_dirty(deletion); let idset = SetBuf::from_dirty(deletion);
let schema = match main_store.schema(writer)? {
Some(schema) => schema,
None => return Err(Error::SchemaMissing),
};
// collect the ranked attributes according to the schema // collect the ranked attributes according to the schema
let ranked_attrs: Vec<_> = schema.iter() let ranked_attrs: Vec<_> = schema.iter()
.filter_map(|(_, attr, prop)| { .filter_map(|(_, attr, prop)| {

View File

@ -1,22 +1,30 @@
mod documents_addition; mod documents_addition;
mod documents_deletion; mod documents_deletion;
mod schema_update;
pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_addition::{DocumentsAddition, apply_documents_addition};
pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion};
pub use self::schema_update::apply_schema_update;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use log::debug; use log::debug;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use crate::{store, Error, MResult, DocumentId, RankedMap}; use crate::{store, Error, MResult, DocumentId, RankedMap};
use crate::error::UnsupportedOperation;
use meilidb_schema::Schema;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum Update { pub enum Update {
SchemaUpdate(Schema),
DocumentsAddition(Vec<rmpv::Value>), DocumentsAddition(Vec<rmpv::Value>),
DocumentsDeletion(Vec<DocumentId>), DocumentsDeletion(Vec<DocumentId>),
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateType { pub enum UpdateType {
SchemaUpdate { schema: Schema },
DocumentsAddition { number: usize }, DocumentsAddition { number: usize },
DocumentsDeletion { number: usize }, DocumentsDeletion { number: usize },
} }
@ -77,6 +85,22 @@ pub fn biggest_update_id(
Ok(max) Ok(max)
} }
pub fn push_schema_update(
writer: &mut rkv::Writer,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
schema: Schema,
) -> MResult<u64>
{
let last_update_id = biggest_update_id(writer, updates_store, updates_results_store)?;
let last_update_id = last_update_id.map_or(0, |n| n + 1);
let update = Update::SchemaUpdate(schema);
let update_id = updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn push_documents_addition<D: serde::Serialize>( pub fn push_documents_addition<D: serde::Serialize>(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
updates_store: store::Updates, updates_store: store::Updates,
@ -127,9 +151,14 @@ pub fn update_task(
None => return Ok(false), None => return Ok(false),
}; };
debug!("Processing update number {}", update_id);
let (update_type, result, duration) = match update { let (update_type, result, duration) = match update {
Update::SchemaUpdate(schema) => {
let start = Instant::now();
let update_type = UpdateType::SchemaUpdate { schema: schema.clone() };
let result = apply_schema_update(writer, index.main, &schema);
(update_type, result, start.elapsed())
},
Update::DocumentsAddition(documents) => { Update::DocumentsAddition(documents) => {
let start = Instant::now(); let start = Instant::now();
@ -140,19 +169,15 @@ pub fn update_task(
let update_type = UpdateType::DocumentsAddition { number: documents.len() }; let update_type = UpdateType::DocumentsAddition { number: documents.len() };
let result = match index.main.schema(writer)? { let result = apply_documents_addition(
Some(schema) => apply_documents_addition(
writer, writer,
index.main, index.main,
index.documents_fields, index.documents_fields,
index.postings_lists, index.postings_lists,
index.docs_words, index.docs_words,
&schema,
ranked_map, ranked_map,
documents, documents,
), );
None => Err(Error::SchemaMissing),
};
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
}, },
@ -166,24 +191,22 @@ pub fn update_task(
let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; let update_type = UpdateType::DocumentsDeletion { number: documents.len() };
let result = match index.main.schema(writer)? { let result = apply_documents_deletion(
Some(schema) => apply_documents_deletion(
writer, writer,
index.main, index.main,
index.documents_fields, index.documents_fields,
index.postings_lists, index.postings_lists,
index.docs_words, index.docs_words,
&schema,
ranked_map, ranked_map,
documents, documents,
), );
None => Err(Error::SchemaMissing),
};
(update_type, result, start.elapsed()) (update_type, result, start.elapsed())
}, },
}; };
debug!("Processed update number {} {:?} {:?}", update_id, update_type, result);
let detailed_duration = DetailedDuration { main: duration }; let detailed_duration = DetailedDuration { main: duration };
let status = UpdateResult { let status = UpdateResult {
update_id, update_id,

View File

@ -0,0 +1,15 @@
use meilidb_schema::Schema;
use crate::{store, error::UnsupportedOperation, MResult};
pub fn apply_schema_update(
writer: &mut rkv::Writer,
main_store: store::Main,
new_schema: &Schema,
) -> MResult<()>
{
if let Some(_) = main_store.schema(writer)? {
return Err(UnsupportedOperation::SchemaAlreadyExists.into())
}
main_store.put_schema(writer, new_schema)
}