mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-25 06:00:08 +01:00
Made many stores do their jobs
This commit is contained in:
parent
c4bd13bcdf
commit
0a731973b9
@ -1,3 +1,5 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use rkv::Value;
|
||||||
use crate::DocumentId;
|
use crate::DocumentId;
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
@ -6,15 +8,6 @@ pub struct DocsWords {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DocsWords {
|
impl DocsWords {
|
||||||
pub fn doc_words<T: rkv::Readable>(
|
|
||||||
&self,
|
|
||||||
reader: &T,
|
|
||||||
document_id: DocumentId,
|
|
||||||
) -> Result<Option<fst::Set>, rkv::StoreError>
|
|
||||||
{
|
|
||||||
Ok(Some(fst::Set::default()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn put_doc_words(
|
pub fn put_doc_words(
|
||||||
&self,
|
&self,
|
||||||
writer: &mut rkv::Writer,
|
writer: &mut rkv::Writer,
|
||||||
@ -22,7 +15,9 @@ impl DocsWords {
|
|||||||
words: &fst::Set,
|
words: &fst::Set,
|
||||||
) -> Result<(), rkv::StoreError>
|
) -> Result<(), rkv::StoreError>
|
||||||
{
|
{
|
||||||
unimplemented!()
|
let document_id_bytes = document_id.0.to_be_bytes();
|
||||||
|
let bytes = words.as_fst().as_bytes();
|
||||||
|
self.docs_words.put(writer, document_id_bytes, &Value::Blob(bytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn del_doc_words(
|
pub fn del_doc_words(
|
||||||
@ -34,4 +29,23 @@ impl DocsWords {
|
|||||||
let document_id_bytes = document_id.0.to_be_bytes();
|
let document_id_bytes = document_id.0.to_be_bytes();
|
||||||
self.docs_words.delete(writer, document_id_bytes)
|
self.docs_words.delete(writer, document_id_bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn doc_words<T: rkv::Readable>(
|
||||||
|
&self,
|
||||||
|
reader: &T,
|
||||||
|
document_id: DocumentId,
|
||||||
|
) -> Result<Option<fst::Set>, rkv::StoreError>
|
||||||
|
{
|
||||||
|
let document_id_bytes = document_id.0.to_be_bytes();
|
||||||
|
match self.docs_words.get(reader, document_id_bytes)? {
|
||||||
|
Some(Value::Blob(bytes)) => {
|
||||||
|
let len = bytes.len();
|
||||||
|
let bytes = Arc::from(bytes);
|
||||||
|
let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap();
|
||||||
|
Ok(Some(fst::Set::from(fst)))
|
||||||
|
},
|
||||||
|
Some(value) => panic!("invalid type {:?}", value),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,35 +7,76 @@ pub struct DocumentsFields {
|
|||||||
pub(crate) documents_fields: rkv::SingleStore,
|
pub(crate) documents_fields: rkv::SingleStore,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DocumentsFields {
|
fn document_attribute_into_key(document_id: DocumentId, attribute: SchemaAttr) -> [u8; 10] {
|
||||||
pub fn del_all_document_fields(
|
let document_id_bytes = document_id.0.to_be_bytes();
|
||||||
&self,
|
let attr_bytes = attribute.0.to_be_bytes();
|
||||||
writer: &mut rkv::Writer,
|
|
||||||
document_id: DocumentId,
|
let mut key = [0u8; 10];
|
||||||
) -> Result<usize, rkv::StoreError>
|
key[0..8].copy_from_slice(&document_id_bytes);
|
||||||
{
|
key[8..10].copy_from_slice(&attr_bytes);
|
||||||
unimplemented!()
|
|
||||||
|
key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl DocumentsFields {
|
||||||
pub fn put_document_field(
|
pub fn put_document_field(
|
||||||
&self,
|
&self,
|
||||||
writer: &mut rkv::Writer,
|
writer: &mut rkv::Writer,
|
||||||
document_id: DocumentId,
|
document_id: DocumentId,
|
||||||
attribute: SchemaAttr,
|
attribute: SchemaAttr,
|
||||||
value: &[u8],
|
value: &[u8],
|
||||||
) -> Result<Option<&[u8]>, rkv::StoreError>
|
) -> Result<(), rkv::StoreError>
|
||||||
{
|
{
|
||||||
unimplemented!()
|
let key = document_attribute_into_key(document_id, attribute);
|
||||||
|
self.documents_fields.put(writer, key, &rkv::Value::Blob(value))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn document_field<T: rkv::Readable>(
|
pub fn del_all_document_fields(
|
||||||
&self,
|
&self,
|
||||||
reader: &T,
|
writer: &mut rkv::Writer,
|
||||||
|
document_id: DocumentId,
|
||||||
|
) -> Result<usize, rkv::StoreError>
|
||||||
|
{
|
||||||
|
let document_id_bytes = document_id.0.to_be_bytes();
|
||||||
|
let mut keys_to_delete = Vec::new();
|
||||||
|
|
||||||
|
// WARN we can not delete the keys using the iterator
|
||||||
|
// so we store them and delete them just after
|
||||||
|
let iter = self.documents_fields.iter_from(writer, document_id_bytes)?;
|
||||||
|
for result in iter {
|
||||||
|
let (key, _) = result?;
|
||||||
|
let current_document_id = {
|
||||||
|
let bytes = key.get(0..8).unwrap();
|
||||||
|
let array = TryFrom::try_from(bytes).unwrap();
|
||||||
|
DocumentId(u64::from_be_bytes(array))
|
||||||
|
};
|
||||||
|
|
||||||
|
if current_document_id != document_id { break }
|
||||||
|
keys_to_delete.push(key.to_owned());
|
||||||
|
}
|
||||||
|
|
||||||
|
let count = keys_to_delete.len();
|
||||||
|
for key in keys_to_delete {
|
||||||
|
self.documents_fields.delete(writer, key)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn document_field<'a, T: rkv::Readable>(
|
||||||
|
&self,
|
||||||
|
reader: &'a T,
|
||||||
document_id: DocumentId,
|
document_id: DocumentId,
|
||||||
attribute: SchemaAttr,
|
attribute: SchemaAttr,
|
||||||
) -> Result<Option<&[u8]>, rkv::StoreError>
|
) -> Result<Option<&'a [u8]>, rkv::StoreError>
|
||||||
{
|
{
|
||||||
unimplemented!()
|
let key = document_attribute_into_key(document_id, attribute);
|
||||||
|
|
||||||
|
match self.documents_fields.get(reader, key)? {
|
||||||
|
Some(rkv::Value::Blob(bytes)) => Ok(Some(bytes)),
|
||||||
|
Some(value) => panic!("invalid type {:?}", value),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn document_fields<'r, T: rkv::Readable>(
|
pub fn document_fields<'r, T: rkv::Readable>(
|
||||||
@ -63,7 +104,7 @@ impl<'r, T: rkv::Readable + 'r> Iterator for DocumentFieldsIter<'r, T> {
|
|||||||
match self.iter.next() {
|
match self.iter.next() {
|
||||||
Some(Ok((key, Some(rkv::Value::Blob(bytes))))) => {
|
Some(Ok((key, Some(rkv::Value::Blob(bytes))))) => {
|
||||||
let bytes = key.get(8..8+2).unwrap();
|
let bytes = key.get(8..8+2).unwrap();
|
||||||
let array = <[u8; 2]>::try_from(bytes).unwrap();
|
let array = TryFrom::try_from(bytes).unwrap();
|
||||||
let attr = u16::from_be_bytes(array);
|
let attr = u16::from_be_bytes(array);
|
||||||
let attr = SchemaAttr::new(attr);
|
let attr = SchemaAttr::new(attr);
|
||||||
Some(Ok((attr, bytes)))
|
Some(Ok((attr, bytes)))
|
||||||
|
@ -1,7 +1,15 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use crate::store::WORDS_KEY;
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
use rkv::Value;
|
||||||
use crate::RankedMap;
|
use crate::RankedMap;
|
||||||
|
|
||||||
|
const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents";
|
||||||
|
const RANKED_MAP_KEY: &str = "ranked-map";
|
||||||
|
const SCHEMA_KEY: &str = "schema";
|
||||||
|
const SYNONYMS_KEY: &str = "synonyms";
|
||||||
|
const WORDS_KEY: &str = "words";
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
pub struct Main {
|
pub struct Main {
|
||||||
pub(crate) main: rkv::SingleStore,
|
pub(crate) main: rkv::SingleStore,
|
||||||
@ -24,7 +32,7 @@ impl Main {
|
|||||||
) -> Result<Option<fst::Set>, rkv::StoreError>
|
) -> Result<Option<fst::Set>, rkv::StoreError>
|
||||||
{
|
{
|
||||||
match self.main.get(reader, WORDS_KEY)? {
|
match self.main.get(reader, WORDS_KEY)? {
|
||||||
Some(rkv::Value::Blob(bytes)) => {
|
Some(Value::Blob(bytes)) => {
|
||||||
let len = bytes.len();
|
let len = bytes.len();
|
||||||
let bytes = Arc::from(bytes);
|
let bytes = Arc::from(bytes);
|
||||||
let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap();
|
let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap();
|
||||||
@ -41,23 +49,50 @@ impl Main {
|
|||||||
ranked_map: &RankedMap,
|
ranked_map: &RankedMap,
|
||||||
) -> Result<(), rkv::StoreError>
|
) -> Result<(), rkv::StoreError>
|
||||||
{
|
{
|
||||||
unimplemented!()
|
let mut bytes = Vec::new();
|
||||||
|
ranked_map.write_to_bin(&mut bytes).unwrap();
|
||||||
|
let blob = Value::Blob(&bytes[..]);
|
||||||
|
self.main.put(writer, RANKED_MAP_KEY, &blob)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn ranked_map<T: rkv::Readable>(
|
pub fn ranked_map<T: rkv::Readable>(
|
||||||
&self,
|
&self,
|
||||||
reader: &T,
|
reader: &T,
|
||||||
) -> Result<RankedMap, rkv::StoreError>
|
) -> Result<Option<RankedMap>, rkv::StoreError>
|
||||||
{
|
{
|
||||||
unimplemented!()
|
match self.main.get(reader, RANKED_MAP_KEY)? {
|
||||||
|
Some(Value::Blob(bytes)) => {
|
||||||
|
let ranked_map = RankedMap::read_from_bin(bytes).unwrap();
|
||||||
|
Ok(Some(ranked_map))
|
||||||
|
},
|
||||||
|
Some(value) => panic!("invalid type {:?}", value),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_number_of_documents<F: Fn(u64) -> u64>(
|
pub fn put_number_of_documents<F: Fn(u64) -> u64>(
|
||||||
&self,
|
&self,
|
||||||
writer: &mut rkv::Writer,
|
writer: &mut rkv::Writer,
|
||||||
func: F,
|
f: F,
|
||||||
) -> Result<(), rkv::StoreError>
|
) -> Result<u64, rkv::StoreError>
|
||||||
{
|
{
|
||||||
unimplemented!()
|
let new = self.number_of_documents(writer).map(f)?;
|
||||||
|
self.main.put(writer, NUMBER_OF_DOCUMENTS_KEY, &Value::Blob(&new.to_be_bytes()))?;
|
||||||
|
Ok(new)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn number_of_documents<T: rkv::Readable>(
|
||||||
|
&self,
|
||||||
|
reader: &T,
|
||||||
|
) -> Result<u64, rkv::StoreError>
|
||||||
|
{
|
||||||
|
match self.main.get(reader, NUMBER_OF_DOCUMENTS_KEY)? {
|
||||||
|
Some(Value::Blob(bytes)) => {
|
||||||
|
let array = bytes.try_into().unwrap();
|
||||||
|
Ok(u64::from_be_bytes(array))
|
||||||
|
},
|
||||||
|
Some(value) => panic!("invalid type {:?}", value),
|
||||||
|
None => Ok(0),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ mod main;
|
|||||||
mod postings_lists;
|
mod postings_lists;
|
||||||
mod synonyms;
|
mod synonyms;
|
||||||
mod updates;
|
mod updates;
|
||||||
|
mod updates_results;
|
||||||
|
|
||||||
pub use self::docs_words::DocsWords;
|
pub use self::docs_words::DocsWords;
|
||||||
pub use self::documents_fields::{DocumentsFields, DocumentFieldsIter};
|
pub use self::documents_fields::{DocumentsFields, DocumentFieldsIter};
|
||||||
@ -11,12 +12,7 @@ pub use self::main::Main;
|
|||||||
pub use self::postings_lists::PostingsLists;
|
pub use self::postings_lists::PostingsLists;
|
||||||
pub use self::synonyms::Synonyms;
|
pub use self::synonyms::Synonyms;
|
||||||
pub use self::updates::Updates;
|
pub use self::updates::Updates;
|
||||||
|
pub use self::updates_results::UpdatesResults;
|
||||||
const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents";
|
|
||||||
const RANKED_MAP_KEY: &str = "ranked-map";
|
|
||||||
const SCHEMA_KEY: &str = "schema";
|
|
||||||
const SYNONYMS_KEY: &str = "synonyms";
|
|
||||||
const WORDS_KEY: &str = "words";
|
|
||||||
|
|
||||||
fn aligned_to(bytes: &[u8], align: usize) -> bool {
|
fn aligned_to(bytes: &[u8], align: usize) -> bool {
|
||||||
(bytes as *const _ as *const () as usize) % align == 0
|
(bytes as *const _ as *const () as usize) % align == 0
|
||||||
@ -42,6 +38,10 @@ fn updates_name(name: &str) -> String {
|
|||||||
format!("{}-updates", name)
|
format!("{}-updates", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn updates_results_name(name: &str) -> String {
|
||||||
|
format!("{}-updates-results", name)
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
pub struct Index {
|
pub struct Index {
|
||||||
pub main: Main,
|
pub main: Main,
|
||||||
@ -50,6 +50,7 @@ pub struct Index {
|
|||||||
pub synonyms: Synonyms,
|
pub synonyms: Synonyms,
|
||||||
pub docs_words: DocsWords,
|
pub docs_words: DocsWords,
|
||||||
pub updates: Updates,
|
pub updates: Updates,
|
||||||
|
pub updates_results: UpdatesResults,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create(env: &rkv::Rkv, name: &str) -> Result<Index, rkv::StoreError> {
|
pub fn create(env: &rkv::Rkv, name: &str) -> Result<Index, rkv::StoreError> {
|
||||||
@ -75,6 +76,7 @@ fn open_options(
|
|||||||
let synonyms_name = synonyms_name(name);
|
let synonyms_name = synonyms_name(name);
|
||||||
let docs_words_name = docs_words_name(name);
|
let docs_words_name = docs_words_name(name);
|
||||||
let updates_name = updates_name(name);
|
let updates_name = updates_name(name);
|
||||||
|
let updates_results_name = updates_results_name(name);
|
||||||
|
|
||||||
// open all the database names
|
// open all the database names
|
||||||
let main = env.open_single(main_name, options)?;
|
let main = env.open_single(main_name, options)?;
|
||||||
@ -83,6 +85,7 @@ fn open_options(
|
|||||||
let synonyms = env.open_single(synonyms_name.as_str(), options)?;
|
let synonyms = env.open_single(synonyms_name.as_str(), options)?;
|
||||||
let docs_words = env.open_single(docs_words_name.as_str(), options)?;
|
let docs_words = env.open_single(docs_words_name.as_str(), options)?;
|
||||||
let updates = env.open_single(updates_name.as_str(), options)?;
|
let updates = env.open_single(updates_name.as_str(), options)?;
|
||||||
|
let updates_results = env.open_single(updates_results_name.as_str(), options)?;
|
||||||
|
|
||||||
Ok(Index {
|
Ok(Index {
|
||||||
main: Main { main },
|
main: Main { main },
|
||||||
@ -91,5 +94,6 @@ fn open_options(
|
|||||||
synonyms: Synonyms { synonyms },
|
synonyms: Synonyms { synonyms },
|
||||||
docs_words: DocsWords { docs_words },
|
docs_words: DocsWords { docs_words },
|
||||||
updates: Updates { updates },
|
updates: Updates { updates },
|
||||||
|
updates_results: UpdatesResults { updates_results },
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -13,14 +13,17 @@ impl Updates {
|
|||||||
) -> Result<u64, rkv::StoreError>
|
) -> Result<u64, rkv::StoreError>
|
||||||
{
|
{
|
||||||
// let update = rmp_serde::to_vec_named(&addition)?;
|
// let update = rmp_serde::to_vec_named(&addition)?;
|
||||||
|
|
||||||
|
// WARN could not retrieve the last key/data entry of a tree...
|
||||||
|
// self.updates.get(writer, )?;
|
||||||
|
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn alternatives_to<T: rkv::Readable>(
|
pub fn pop_back(
|
||||||
&self,
|
&self,
|
||||||
reader: &T,
|
writer: &mut rkv::Writer,
|
||||||
word: &[u8],
|
) -> Result<Option<(u64, Update)>, rkv::StoreError>
|
||||||
) -> Result<Option<fst::Set>, rkv::StoreError>
|
|
||||||
{
|
{
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
31
src/store/updates_results.rs
Normal file
31
src/store/updates_results.rs
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
use crate::update::UpdateResult;
|
||||||
|
|
||||||
|
#[derive(Copy, Clone)]
|
||||||
|
pub struct UpdatesResults {
|
||||||
|
pub(crate) updates_results: rkv::SingleStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpdatesResults {
|
||||||
|
pub fn put_update_result(
|
||||||
|
&self,
|
||||||
|
writer: &mut rkv::Writer,
|
||||||
|
update_id: u64,
|
||||||
|
update_result: &UpdateResult,
|
||||||
|
) -> Result<(), rkv::StoreError>
|
||||||
|
{
|
||||||
|
// let update = rmp_serde::to_vec_named(&addition)?;
|
||||||
|
|
||||||
|
// WARN could not retrieve the last key/data entry of a tree...
|
||||||
|
// self.updates.get(writer, )?;
|
||||||
|
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_result<T: rkv::Readable>(
|
||||||
|
reader: &T,
|
||||||
|
update_id: u64,
|
||||||
|
) -> Result<Option<UpdateResult>, rkv::StoreError>
|
||||||
|
{
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
@ -4,10 +4,10 @@ mod documents_deletion;
|
|||||||
pub use self::documents_addition::{DocumentsAddition, apply_documents_addition};
|
pub use self::documents_addition::{DocumentsAddition, apply_documents_addition};
|
||||||
pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion};
|
pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion};
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
use crate::{store, DocumentId};
|
use crate::{store, Error, DocumentId};
|
||||||
use super::Error;
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
pub enum Update {
|
pub enum Update {
|
||||||
@ -17,6 +17,27 @@ pub enum Update {
|
|||||||
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
|
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Serialize, Deserialize)]
|
||||||
|
pub enum UpdateType {
|
||||||
|
DocumentsAddition { number: usize },
|
||||||
|
DocumentsDeletion { number: usize },
|
||||||
|
SynonymsAddition { number: usize },
|
||||||
|
SynonymsDeletion { number: usize },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Serialize, Deserialize)]
|
||||||
|
pub struct DetailedDuration {
|
||||||
|
pub main: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Serialize, Deserialize)]
|
||||||
|
pub struct UpdateResult {
|
||||||
|
pub update_id: u64,
|
||||||
|
pub update_type: UpdateType,
|
||||||
|
pub result: Result<(), String>,
|
||||||
|
pub detailed_duration: DetailedDuration,
|
||||||
|
}
|
||||||
|
|
||||||
pub fn push_documents_addition<D: serde::Serialize>(
|
pub fn push_documents_addition<D: serde::Serialize>(
|
||||||
writer: &mut rkv::Writer,
|
writer: &mut rkv::Writer,
|
||||||
updates_store: store::Updates,
|
updates_store: store::Updates,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user