From 0a731973b91f49c9e9bc4eb3d9ec3ef569e82857 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 3 Oct 2019 16:13:09 +0200 Subject: [PATCH] Made many stores do their jobs --- src/store/docs_words.rs | 34 +++++++++++----- src/store/documents_fields.rs | 73 +++++++++++++++++++++++++++-------- src/store/main.rs | 51 ++++++++++++++++++++---- src/store/mod.rs | 16 +++++--- src/store/updates.rs | 11 ++++-- src/store/updates_results.rs | 31 +++++++++++++++ src/update/mod.rs | 25 +++++++++++- 7 files changed, 195 insertions(+), 46 deletions(-) create mode 100644 src/store/updates_results.rs diff --git a/src/store/docs_words.rs b/src/store/docs_words.rs index 330051298..40a6d0a5c 100644 --- a/src/store/docs_words.rs +++ b/src/store/docs_words.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; +use rkv::Value; use crate::DocumentId; #[derive(Copy, Clone)] @@ -6,15 +8,6 @@ pub struct DocsWords { } impl DocsWords { - pub fn doc_words( - &self, - reader: &T, - document_id: DocumentId, - ) -> Result, rkv::StoreError> - { - Ok(Some(fst::Set::default())) - } - pub fn put_doc_words( &self, writer: &mut rkv::Writer, @@ -22,7 +15,9 @@ impl DocsWords { words: &fst::Set, ) -> Result<(), rkv::StoreError> { - unimplemented!() + let document_id_bytes = document_id.0.to_be_bytes(); + let bytes = words.as_fst().as_bytes(); + self.docs_words.put(writer, document_id_bytes, &Value::Blob(bytes)) } pub fn del_doc_words( @@ -34,4 +29,23 @@ impl DocsWords { let document_id_bytes = document_id.0.to_be_bytes(); self.docs_words.delete(writer, document_id_bytes) } + + pub fn doc_words( + &self, + reader: &T, + document_id: DocumentId, + ) -> Result, rkv::StoreError> + { + let document_id_bytes = document_id.0.to_be_bytes(); + match self.docs_words.get(reader, document_id_bytes)? { + Some(Value::Blob(bytes)) => { + let len = bytes.len(); + let bytes = Arc::from(bytes); + let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap(); + Ok(Some(fst::Set::from(fst))) + }, + Some(value) => panic!("invalid type {:?}", value), + None => Ok(None), + } + } } diff --git a/src/store/documents_fields.rs b/src/store/documents_fields.rs index ca6fd67fc..a5f41876e 100644 --- a/src/store/documents_fields.rs +++ b/src/store/documents_fields.rs @@ -7,35 +7,76 @@ pub struct DocumentsFields { pub(crate) documents_fields: rkv::SingleStore, } -impl DocumentsFields { - pub fn del_all_document_fields( - &self, - writer: &mut rkv::Writer, - document_id: DocumentId, - ) -> Result - { - unimplemented!() - } +fn document_attribute_into_key(document_id: DocumentId, attribute: SchemaAttr) -> [u8; 10] { + let document_id_bytes = document_id.0.to_be_bytes(); + let attr_bytes = attribute.0.to_be_bytes(); + let mut key = [0u8; 10]; + key[0..8].copy_from_slice(&document_id_bytes); + key[8..10].copy_from_slice(&attr_bytes); + + key +} + +impl DocumentsFields { pub fn put_document_field( &self, writer: &mut rkv::Writer, document_id: DocumentId, attribute: SchemaAttr, value: &[u8], - ) -> Result, rkv::StoreError> + ) -> Result<(), rkv::StoreError> { - unimplemented!() + let key = document_attribute_into_key(document_id, attribute); + self.documents_fields.put(writer, key, &rkv::Value::Blob(value)) } - pub fn document_field( + pub fn del_all_document_fields( &self, - reader: &T, + writer: &mut rkv::Writer, + document_id: DocumentId, + ) -> Result + { + let document_id_bytes = document_id.0.to_be_bytes(); + let mut keys_to_delete = Vec::new(); + + // WARN we can not delete the keys using the iterator + // so we store them and delete them just after + let iter = self.documents_fields.iter_from(writer, document_id_bytes)?; + for result in iter { + let (key, _) = result?; + let current_document_id = { + let bytes = key.get(0..8).unwrap(); + let array = TryFrom::try_from(bytes).unwrap(); + DocumentId(u64::from_be_bytes(array)) + }; + + if current_document_id != document_id { break } + keys_to_delete.push(key.to_owned()); + } + + let count = keys_to_delete.len(); + for key in keys_to_delete { + self.documents_fields.delete(writer, key)?; + } + + Ok(count) + } + + pub fn document_field<'a, T: rkv::Readable>( + &self, + reader: &'a T, document_id: DocumentId, attribute: SchemaAttr, - ) -> Result, rkv::StoreError> + ) -> Result, rkv::StoreError> { - unimplemented!() + let key = document_attribute_into_key(document_id, attribute); + + match self.documents_fields.get(reader, key)? { + Some(rkv::Value::Blob(bytes)) => Ok(Some(bytes)), + Some(value) => panic!("invalid type {:?}", value), + None => Ok(None), + } } pub fn document_fields<'r, T: rkv::Readable>( @@ -63,7 +104,7 @@ impl<'r, T: rkv::Readable + 'r> Iterator for DocumentFieldsIter<'r, T> { match self.iter.next() { Some(Ok((key, Some(rkv::Value::Blob(bytes))))) => { let bytes = key.get(8..8+2).unwrap(); - let array = <[u8; 2]>::try_from(bytes).unwrap(); + let array = TryFrom::try_from(bytes).unwrap(); let attr = u16::from_be_bytes(array); let attr = SchemaAttr::new(attr); Some(Ok((attr, bytes))) diff --git a/src/store/main.rs b/src/store/main.rs index d0cce80e6..870539f3e 100644 --- a/src/store/main.rs +++ b/src/store/main.rs @@ -1,7 +1,15 @@ use std::sync::Arc; -use crate::store::WORDS_KEY; +use std::convert::TryInto; + +use rkv::Value; use crate::RankedMap; +const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents"; +const RANKED_MAP_KEY: &str = "ranked-map"; +const SCHEMA_KEY: &str = "schema"; +const SYNONYMS_KEY: &str = "synonyms"; +const WORDS_KEY: &str = "words"; + #[derive(Copy, Clone)] pub struct Main { pub(crate) main: rkv::SingleStore, @@ -24,7 +32,7 @@ impl Main { ) -> Result, rkv::StoreError> { match self.main.get(reader, WORDS_KEY)? { - Some(rkv::Value::Blob(bytes)) => { + Some(Value::Blob(bytes)) => { let len = bytes.len(); let bytes = Arc::from(bytes); let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap(); @@ -41,23 +49,50 @@ impl Main { ranked_map: &RankedMap, ) -> Result<(), rkv::StoreError> { - unimplemented!() + let mut bytes = Vec::new(); + ranked_map.write_to_bin(&mut bytes).unwrap(); + let blob = Value::Blob(&bytes[..]); + self.main.put(writer, RANKED_MAP_KEY, &blob) } pub fn ranked_map( &self, reader: &T, - ) -> Result + ) -> Result, rkv::StoreError> { - unimplemented!() + match self.main.get(reader, RANKED_MAP_KEY)? { + Some(Value::Blob(bytes)) => { + let ranked_map = RankedMap::read_from_bin(bytes).unwrap(); + Ok(Some(ranked_map)) + }, + Some(value) => panic!("invalid type {:?}", value), + None => Ok(None), + } } pub fn put_number_of_documents u64>( &self, writer: &mut rkv::Writer, - func: F, - ) -> Result<(), rkv::StoreError> + f: F, + ) -> Result { - unimplemented!() + let new = self.number_of_documents(writer).map(f)?; + self.main.put(writer, NUMBER_OF_DOCUMENTS_KEY, &Value::Blob(&new.to_be_bytes()))?; + Ok(new) + } + + pub fn number_of_documents( + &self, + reader: &T, + ) -> Result + { + match self.main.get(reader, NUMBER_OF_DOCUMENTS_KEY)? { + Some(Value::Blob(bytes)) => { + let array = bytes.try_into().unwrap(); + Ok(u64::from_be_bytes(array)) + }, + Some(value) => panic!("invalid type {:?}", value), + None => Ok(0), + } } } diff --git a/src/store/mod.rs b/src/store/mod.rs index 49297b9ff..da946c499 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -4,6 +4,7 @@ mod main; mod postings_lists; mod synonyms; mod updates; +mod updates_results; pub use self::docs_words::DocsWords; pub use self::documents_fields::{DocumentsFields, DocumentFieldsIter}; @@ -11,12 +12,7 @@ pub use self::main::Main; pub use self::postings_lists::PostingsLists; pub use self::synonyms::Synonyms; pub use self::updates::Updates; - -const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents"; -const RANKED_MAP_KEY: &str = "ranked-map"; -const SCHEMA_KEY: &str = "schema"; -const SYNONYMS_KEY: &str = "synonyms"; -const WORDS_KEY: &str = "words"; +pub use self::updates_results::UpdatesResults; fn aligned_to(bytes: &[u8], align: usize) -> bool { (bytes as *const _ as *const () as usize) % align == 0 @@ -42,6 +38,10 @@ fn updates_name(name: &str) -> String { format!("{}-updates", name) } +fn updates_results_name(name: &str) -> String { + format!("{}-updates-results", name) +} + #[derive(Copy, Clone)] pub struct Index { pub main: Main, @@ -50,6 +50,7 @@ pub struct Index { pub synonyms: Synonyms, pub docs_words: DocsWords, pub updates: Updates, + pub updates_results: UpdatesResults, } pub fn create(env: &rkv::Rkv, name: &str) -> Result { @@ -75,6 +76,7 @@ fn open_options( let synonyms_name = synonyms_name(name); let docs_words_name = docs_words_name(name); let updates_name = updates_name(name); + let updates_results_name = updates_results_name(name); // open all the database names let main = env.open_single(main_name, options)?; @@ -83,6 +85,7 @@ fn open_options( let synonyms = env.open_single(synonyms_name.as_str(), options)?; let docs_words = env.open_single(docs_words_name.as_str(), options)?; let updates = env.open_single(updates_name.as_str(), options)?; + let updates_results = env.open_single(updates_results_name.as_str(), options)?; Ok(Index { main: Main { main }, @@ -91,5 +94,6 @@ fn open_options( synonyms: Synonyms { synonyms }, docs_words: DocsWords { docs_words }, updates: Updates { updates }, + updates_results: UpdatesResults { updates_results }, }) } diff --git a/src/store/updates.rs b/src/store/updates.rs index 6279c1f7e..124644e6f 100644 --- a/src/store/updates.rs +++ b/src/store/updates.rs @@ -13,14 +13,17 @@ impl Updates { ) -> Result { // let update = rmp_serde::to_vec_named(&addition)?; + + // WARN could not retrieve the last key/data entry of a tree... + // self.updates.get(writer, )?; + unimplemented!() } - pub fn alternatives_to( + pub fn pop_back( &self, - reader: &T, - word: &[u8], - ) -> Result, rkv::StoreError> + writer: &mut rkv::Writer, + ) -> Result, rkv::StoreError> { unimplemented!() } diff --git a/src/store/updates_results.rs b/src/store/updates_results.rs new file mode 100644 index 000000000..22bd35976 --- /dev/null +++ b/src/store/updates_results.rs @@ -0,0 +1,31 @@ +use crate::update::UpdateResult; + +#[derive(Copy, Clone)] +pub struct UpdatesResults { + pub(crate) updates_results: rkv::SingleStore, +} + +impl UpdatesResults { + pub fn put_update_result( + &self, + writer: &mut rkv::Writer, + update_id: u64, + update_result: &UpdateResult, + ) -> Result<(), rkv::StoreError> + { + // let update = rmp_serde::to_vec_named(&addition)?; + + // WARN could not retrieve the last key/data entry of a tree... + // self.updates.get(writer, )?; + + unimplemented!() + } + + pub fn update_result( + reader: &T, + update_id: u64, + ) -> Result, rkv::StoreError> + { + unimplemented!() + } +} diff --git a/src/update/mod.rs b/src/update/mod.rs index 85621a9fd..da560c521 100644 --- a/src/update/mod.rs +++ b/src/update/mod.rs @@ -4,10 +4,10 @@ mod documents_deletion; pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; +use std::time::Duration; use std::collections::BTreeMap; use serde::{Serialize, Deserialize}; -use crate::{store, DocumentId}; -use super::Error; +use crate::{store, Error, DocumentId}; #[derive(Serialize, Deserialize)] pub enum Update { @@ -17,6 +17,27 @@ pub enum Update { SynonymsDeletion(BTreeMap>>), } +#[derive(Clone, Serialize, Deserialize)] +pub enum UpdateType { + DocumentsAddition { number: usize }, + DocumentsDeletion { number: usize }, + SynonymsAddition { number: usize }, + SynonymsDeletion { number: usize }, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct DetailedDuration { + pub main: Duration, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct UpdateResult { + pub update_id: u64, + pub update_type: UpdateType, + pub result: Result<(), String>, + pub detailed_duration: DetailedDuration, +} + pub fn push_documents_addition( writer: &mut rkv::Writer, updates_store: store::Updates,