From 86f23d2695380d52ae84bee0c41ca8db1d34bd56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 21 Nov 2018 15:19:29 +0100 Subject: [PATCH] feat: Save the schema in the key-value store --- src/index/mod.rs | 61 +++++++++++++++++++++++------ src/index/update/mod.rs | 10 +---- src/index/update/negative_update.rs | 11 +++--- src/index/update/positive_update.rs | 15 +++---- 4 files changed, 63 insertions(+), 34 deletions(-) diff --git a/src/index/mod.rs b/src/index/mod.rs index 9a53b49ab..49cd24076 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -12,12 +12,14 @@ use std::path::{Path, PathBuf}; use std::collections::{BTreeSet, BTreeMap}; use fs2::FileExt; +use ::rocksdb::rocksdb::Writable; use ::rocksdb::{rocksdb, rocksdb_options}; use ::rocksdb::merge_operator::MergeOperands; use crate::rank::Document; use crate::data::DocIdsBuilder; use crate::{DocIndex, DocumentId}; +use crate::index::schema::Schema; use crate::index::update::Update; use crate::blob::{PositiveBlobBuilder, Blob, Sign}; use crate::blob::ordered_blobs_from_slice; @@ -25,6 +27,13 @@ use crate::tokenizer::{TokenizerBuilder, DefaultBuilder, Tokenizer}; use crate::rank::{criterion, Config, RankedStream}; use crate::automaton; +const DATA_PREFIX: &str = "data"; +const BLOB_PREFIX: &str = "blob"; +const DOCU_PREFIX: &str = "docu"; + +const DATA_BLOBS_ORDER: &str = "data-blobs-order"; +const DATA_SCHEMA: &str = "data-schema"; + fn simple_vec_append(key: &[u8], value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec { let mut output = Vec::new(); for bytes in operands.chain(value) { @@ -38,15 +47,18 @@ pub struct Index { } impl Index { - pub fn create>(path: P) -> Result> { - unimplemented!("return a soft error: the database already exist at the given path") + pub fn create>(path: P, schema: Schema) -> Result> { // Self::open must not take a parameter for create_if_missing // or we must create an OpenOptions with many parameters // https://doc.rust-lang.org/std/fs/struct.OpenOptions.html - } - pub fn open>(path: P) -> Result> { - let path = path.as_ref().to_string_lossy(); + let path = path.as_ref(); + if path.exists() { + return Err(format!("File already exists at path: {}, cannot create database.", + path.display()).into()) + } + + let path = path.to_string_lossy(); let mut opts = rocksdb_options::DBOptions::new(); opts.create_if_missing(true); @@ -55,8 +67,28 @@ impl Index { let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?; - // check if index is a valid RocksDB and - // contains the right key-values (i.e. "blobs-order") + let mut schema_bytes = Vec::new(); + schema.write_to(&mut schema_bytes)?; + database.put(DATA_SCHEMA.as_bytes(), &schema_bytes)?; + + Ok(Self { database }) + } + + pub fn open>(path: P) -> Result> { + let path = path.as_ref().to_string_lossy(); + + let mut opts = rocksdb_options::DBOptions::new(); + opts.create_if_missing(false); + + let mut cf_opts = rocksdb_options::ColumnFamilyOptions::new(); + cf_opts.add_merge_operator("blobs order operator", simple_vec_append); + + let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?; + + let _schema = match database.get(DATA_SCHEMA.as_bytes())? { + Some(value) => Schema::read_from(&*value)?, + None => return Err(String::from("Database does not contain a schema").into()), + }; Ok(Self { database }) } @@ -74,17 +106,20 @@ impl Index { Ok(()) } - fn blobs(&self) -> Result, Box> { - match self.database.get(b"00-blobs-order")? { - Some(value) => Ok(ordered_blobs_from_slice(&value)?), - None => Ok(Vec::new()), - } + pub fn schema(&self) -> Result> { + let bytes = self.database.get(DATA_SCHEMA.as_bytes())?.expect("data-schema entry not found"); + Ok(Schema::read_from(&*bytes).expect("Invalid schema")) } pub fn search(&self, query: &str) -> Result, Box> { + // this snapshot will allow consistent operations on documents + let snapshot = self.database.snapshot(); // FIXME create a SNAPSHOT for the search ! - let blobs = self.blobs()?; + let blobs = match snapshot.get(DATA_BLOBS_ORDER.as_bytes())? { + Some(value) => ordered_blobs_from_slice(&value)?, + None => Vec::new(), + }; let mut automatons = Vec::new(); for query in query.split_whitespace().map(str::to_lowercase) { diff --git a/src/index/update/mod.rs b/src/index/update/mod.rs index cc933ddbc..8f6f22329 100644 --- a/src/index/update/mod.rs +++ b/src/index/update/mod.rs @@ -12,11 +12,6 @@ mod positive_update; pub use self::negative_update::{NegativeUpdateBuilder}; pub use self::positive_update::{PositiveUpdateBuilder, NewState}; -// These prefixes are here to make sure the documents fields -// and the internal data doesn't collide and the internal data are -// at the top of the sst file. -const FIELD_BLOBS_ORDER: &str = "00-blobs-order"; - pub struct Update { path: PathBuf, } @@ -31,10 +26,7 @@ impl Update { file_writer.open(&path.to_string_lossy())?; let infos = file_writer.finish()?; - if infos.smallest_key() != FIELD_BLOBS_ORDER.as_bytes() { - // FIXME return a nice error - panic!("Invalid update file: the blobs-order field is not the smallest key") - } + // FIXME check if the update contains a blobs-order entry Ok(Update { path }) } diff --git a/src/index/update/negative_update.rs b/src/index/update/negative_update.rs index e90595a5c..1dfbea9b0 100644 --- a/src/index/update/negative_update.rs +++ b/src/index/update/negative_update.rs @@ -3,7 +3,8 @@ use std::error::Error; use ::rocksdb::rocksdb_options; -use crate::index::update::{FIELD_BLOBS_ORDER, Update}; +use crate::index::DATA_BLOBS_ORDER; +use crate::index::update::Update; use crate::index::blob_name::BlobName; use crate::data::DocIdsBuilder; use crate::DocumentId; @@ -40,16 +41,16 @@ impl NegativeUpdateBuilder { // write the blob name to be merged let blob_name = blob_name.to_string(); - file_writer.merge(FIELD_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?; + file_writer.merge(DATA_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?; // write the doc ids - let blob_key = format!("0b-{}-doc-ids", blob_name); + let blob_key = format!("BLOB-{}-doc-ids", blob_name); let blob_doc_ids = self.doc_ids.into_inner()?; file_writer.put(blob_key.as_bytes(), &blob_doc_ids)?; for id in blob_doc_ids { - let start = format!("5d-{}", id); - let end = format!("5d-{}", id + 1); + let start = format!("DOCU-{}", id); + let end = format!("DOCU-{}", id + 1); file_writer.delete_range(start.as_bytes(), end.as_bytes())?; } diff --git a/src/index/update/positive_update.rs b/src/index/update/positive_update.rs index a6eb3f5d2..57b7a0ee8 100644 --- a/src/index/update/positive_update.rs +++ b/src/index/update/positive_update.rs @@ -5,10 +5,11 @@ use std::fmt::Write; use ::rocksdb::rocksdb_options; -use crate::index::schema::{SchemaProps, Schema, SchemaAttr}; -use crate::index::update::{FIELD_BLOBS_ORDER, Update}; -use crate::tokenizer::TokenizerBuilder; +use crate::index::DATA_BLOBS_ORDER; +use crate::index::update::Update; use crate::index::blob_name::BlobName; +use crate::index::schema::{SchemaProps, Schema, SchemaAttr}; +use crate::tokenizer::TokenizerBuilder; use crate::blob::PositiveBlobBuilder; use crate::{DocIndex, DocumentId}; @@ -66,7 +67,7 @@ where B: TokenizerBuilder // write the blob name to be merged let blob_name = blob_name.to_string(); - file_writer.put(FIELD_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?; + file_writer.put(DATA_BLOBS_ORDER.as_bytes(), blob_name.as_bytes())?; let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); for ((document_id, field), state) in &self.new_states { @@ -96,15 +97,15 @@ where B: TokenizerBuilder let (blob_fst_map, blob_doc_idx) = builder.into_inner()?; // write the fst - let blob_key = format!("0b-{}-fst", blob_name); + let blob_key = format!("BLOB-{}-fst", blob_name); file_writer.put(blob_key.as_bytes(), &blob_fst_map)?; // write the doc-idx - let blob_key = format!("0b-{}-doc-idx", blob_name); + let blob_key = format!("BLOB-{}-doc-idx", blob_name); file_writer.put(blob_key.as_bytes(), &blob_doc_idx)?; // write all the documents fields updates - let mut key = String::from("5d-"); + let mut key = String::from("DOCU-"); let prefix_len = key.len(); for ((id, field), state) in self.new_states {