feat: Implement the PositiveUpdate

This commit is contained in:
Clément Renault 2018-11-29 16:28:10 +01:00
parent af791db23d
commit ec5d17e8c2
No known key found for this signature in database
GPG Key ID: 0151CDAB43460DAE
4 changed files with 42 additions and 29 deletions

View File

@ -14,8 +14,11 @@ use crate::index::update::Update;
use crate::rank::QueryBuilder; use crate::rank::QueryBuilder;
use crate::blob::{self, Blob}; use crate::blob::{self, Blob};
const DATA_INDEX: &[u8] = b"data-index";
const DATA_SCHEMA: &[u8] = b"data-schema";
fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> { fn merge_indexes(key: &[u8], existing_value: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
if key != b"data-index" { panic!("The merge operator only supports \"data-index\" merging") } if key != DATA_INDEX { panic!("The merge operator only supports \"data-index\" merging") }
let capacity = { let capacity = {
let remaining = operands.size_hint().0; let remaining = operands.size_hint().0;
@ -65,7 +68,7 @@ impl Index {
let mut schema_bytes = Vec::new(); let mut schema_bytes = Vec::new();
schema.write_to(&mut schema_bytes)?; schema.write_to(&mut schema_bytes)?;
database.put(b"data-schema", &schema_bytes)?; database.put(DATA_SCHEMA, &schema_bytes)?;
Ok(Self { database }) Ok(Self { database })
} }
@ -81,7 +84,7 @@ impl Index {
let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?; let database = rocksdb::DB::open_cf(opts, &path, vec![("default", cf_opts)])?;
let _schema = match database.get(b"data-schema")? { let _schema = match database.get(DATA_SCHEMA)? {
Some(value) => Schema::read_from(&*value)?, Some(value) => Schema::read_from(&*value)?,
None => return Err(String::from("Database does not contain a schema").into()), None => return Err(String::from("Database does not contain a schema").into()),
}; };
@ -103,7 +106,7 @@ impl Index {
} }
pub fn schema(&self) -> Result<Schema, Box<Error>> { pub fn schema(&self) -> Result<Schema, Box<Error>> {
let bytes = self.database.get(b"data-schema")?.expect("data-schema entry not found"); let bytes = self.database.get(DATA_SCHEMA)?.expect("data-schema entry not found");
Ok(Schema::read_from(&*bytes).expect("Invalid schema")) Ok(Schema::read_from(&*bytes).expect("Invalid schema"))
} }

View File

@ -1,12 +1,27 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::error::Error; use std::error::Error;
use std::io::{Cursor, Write};
use byteorder::{NetworkEndian, WriteBytesExt};
use ::rocksdb::rocksdb_options; use ::rocksdb::rocksdb_options;
use crate::data::{DocIds, DocIdsBuilder};
use crate::blob::{Blob, NegativeBlob};
use crate::index::update::Update; use crate::index::update::Update;
use crate::data::DocIdsBuilder; use crate::index::DATA_INDEX;
use crate::DocumentId; use crate::DocumentId;
const DOC_KEY_LEN: usize = 4 + std::mem::size_of::<u64>();
// "doc-ID_8_BYTES"
fn raw_document_key(id: DocumentId) -> [u8; DOC_KEY_LEN] {
let mut key = [0; DOC_KEY_LEN];
let mut rdr = Cursor::new(&mut key[..]);
rdr.write_all(b"doc-").unwrap();
rdr.write_u64::<NetworkEndian>(id).unwrap();
key
}
pub struct NegativeUpdateBuilder { pub struct NegativeUpdateBuilder {
path: PathBuf, path: PathBuf,
doc_ids: DocIdsBuilder<Vec<u8>>, doc_ids: DocIdsBuilder<Vec<u8>>,
@ -30,29 +45,27 @@ impl NegativeUpdateBuilder {
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?; file_writer.open(&self.path.to_string_lossy())?;
// // write the doc ids // write the data-index aka negative blob
// let blob_key = Identifier::blob(blob_info.name).document_ids().build(); let bytes = self.doc_ids.into_inner()?;
// let blob_doc_ids = self.doc_ids.into_inner()?; let doc_ids = DocIds::from_bytes(bytes)?;
// file_writer.put(&blob_key, &blob_doc_ids)?; let blob = Blob::Negative(NegativeBlob::from_raw(doc_ids));
let bytes = bincode::serialize(&blob)?;
file_writer.merge(DATA_INDEX, &bytes);
// { // FIXME remove this ugly thing !
// // write the blob name to be merged // let Blob::Negative(negative_blob) = blob;
// let mut buffer = Vec::new(); let negative_blob = match blob {
// blob_info.write_into(&mut buffer); Blob::Negative(blob) => blob,
// let data_key = Identifier::data().blobs_order().build(); Blob::Positive(_) => unreachable!(),
// file_writer.merge(&data_key, &buffer)?; };
// }
// let blob_doc_ids = DocIds::from_bytes(blob_doc_ids)?; for &document_id in negative_blob.as_ref() {
// for id in blob_doc_ids.doc_ids().iter().cloned() { let start = raw_document_key(document_id);
// let start = Identifier::document(id).build(); let end = raw_document_key(document_id + 1);
// let end = Identifier::document(id + 1).build(); file_writer.delete_range(&start, &end)?;
// file_writer.delete_range(&start, &end)?; }
// }
// file_writer.finish()?; file_writer.finish()?;
// Update::open(self.path) Update::open(self.path)
unimplemented!()
} }
} }

View File

@ -52,7 +52,6 @@ where B: TokenizerBuilder
let env_options = rocksdb_options::EnvOptions::new(); let env_options = rocksdb_options::EnvOptions::new();
let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); let column_family_options = rocksdb_options::ColumnFamilyOptions::new();
let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options);
file_writer.open(&self.path.to_string_lossy())?; file_writer.open(&self.path.to_string_lossy())?;
// let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new()); // let mut builder = PositiveBlobBuilder::new(Vec::new(), Vec::new());

View File

@ -1,5 +1,3 @@
#![feature(range_contains)]
#[macro_use] extern crate lazy_static; #[macro_use] extern crate lazy_static;
#[macro_use] extern crate serde_derive; #[macro_use] extern crate serde_derive;