diff --git a/Cargo.toml b/Cargo.toml index 572cbf2aa..9be65a7b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,11 +49,6 @@ rand_xorshift = "0.1" structopt = "0.2" tempfile = "3.0" termcolor = "1.0" -warp = "0.1" - -[dev-dependencies.chashmap] -git = "https://gitlab.redox-os.org/redox-os/tfs.git" -rev = "b3e7cae1" [profile.release] debug = true diff --git a/examples/create-database.rs b/examples/create-database.rs index 4ba16a2a6..b00fbf4b5 100644 --- a/examples/create-database.rs +++ b/examples/create-database.rs @@ -11,7 +11,7 @@ use std::fs::File; use serde_derive::{Serialize, Deserialize}; use structopt::StructOpt; -use meilidb::database::{Database, Schema, UpdateBuilder}; +use meilidb::database::{Database, Schema}; use meilidb::tokenizer::DefaultBuilder; #[derive(Debug, StructOpt)] @@ -61,8 +61,7 @@ fn index( while !end_of_file { let tokenizer_builder = DefaultBuilder::new(); - let update_path = tempfile::NamedTempFile::new()?; - let mut update = UpdateBuilder::new(update_path.path().to_path_buf(), schema.clone()); + let mut update = database.update()?; loop { end_of_file = !rdr.read_record(&mut raw_record)?; @@ -88,10 +87,8 @@ fn index( println!(); - println!("building update..."); - let update = update.build()?; - println!("ingesting update..."); - database.ingest_update_file(update)?; + println!("committing update..."); + update.commit()?; } Ok(database) diff --git a/examples/http-server.rs b/examples/http-server.rs deleted file mode 100644 index 315c813ac..000000000 --- a/examples/http-server.rs +++ /dev/null @@ -1,431 +0,0 @@ -#[global_allocator] -static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; - -use std::collections::{HashMap, HashSet}; -use std::error::Error; -use std::ffi::OsStr; -use std::fmt; -use std::fs::{self, File}; -use std::io::{self, BufRead, BufReader}; -use std::net::SocketAddr; -use std::path::{PathBuf, Path}; -use std::sync::Arc; -use std::time::SystemTime; - -use chashmap::{CHashMap, ReadGuard}; -use elapsed::measure_time; -use meilidb::database::{Database, UpdateBuilder}; -use meilidb::database::schema::{Schema, SchemaBuilder}; -use meilidb::tokenizer::DefaultBuilder; -use serde_derive::{Serialize, Deserialize}; -use structopt::StructOpt; -use warp::{Rejection, Filter}; -use log::{error, info}; - -#[derive(Debug, StructOpt)] -pub struct Opt { - /// The destination where the database must be created. - #[structopt(parse(from_os_str))] - pub database_path: PathBuf, - - /// The address and port to bind the server to. - #[structopt(short = "l", default_value = "127.0.0.1:8080")] - pub listen_addr: SocketAddr, - - /// The path to the list of stop words (one by line). - #[structopt(long = "stop-words", parse(from_os_str))] - pub stop_words: PathBuf, -} - -// -// ERRORS FOR THE MULTIDATABASE -// - -#[derive(Debug)] -pub enum DatabaseError { - AlreadyExist, - NotExist, - NotFound(String), - Unknown(Box), -} - -impl fmt::Display for DatabaseError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - DatabaseError::AlreadyExist => write!(f, "File already exist"), - DatabaseError::NotExist => write!(f, "File not exist"), - DatabaseError::NotFound(ref name) => write!(f, "Database {} not found", name), - DatabaseError::Unknown(e) => write!(f, "{}", e), - } - } -} - -impl Error for DatabaseError {} - -impl From> for DatabaseError { - fn from(e: Box) -> DatabaseError { - DatabaseError::Unknown(e) - } -} - -// -// MULTIDATABASE DEFINITION -// - -pub struct MultiDatabase { - databases: CHashMap, - db_path: PathBuf, - stop_words: HashSet, -} - -impl MultiDatabase { - - pub fn new(path: PathBuf, stop_words: HashSet) -> MultiDatabase { - MultiDatabase { - databases: CHashMap::new(), - db_path: path, - stop_words: stop_words - } - } - - pub fn create(&self, name: String, schema: Schema) -> Result<(), DatabaseError> { - let rdb_name = format!("{}.mdb", name); - let database_path = self.db_path.join(rdb_name); - - if database_path.exists() { - return Err(DatabaseError::AlreadyExist.into()); - } - - let index = Database::create(database_path, &schema)?; - - self.databases.insert_new(name, index); - - Ok(()) - } - - pub fn load(&self, name: String) -> Result<(), DatabaseError> { - let rdb_name = format!("{}.mdb", name); - let index_path = self.db_path.join(rdb_name); - - if !index_path.exists() { - return Err(DatabaseError::NotExist.into()); - } - - let index = Database::open(index_path)?; - - self.databases.insert_new(name, index); - - Ok(()) - } - - pub fn load_existing(&self) { - let paths = match fs::read_dir(self.db_path.clone()){ - Ok(p) => p, - Err(e) => { - error!("{}", e); - return - } - }; - - for path in paths { - let path = match path { - Ok(p) => p.path(), - Err(_) => continue - }; - - let path_str = match path.to_str() { - Some(p) => p, - None => continue - }; - - let extension = match get_extension_from_path(path_str) { - Some(e) => e, - None => continue - }; - - if extension != "mdb" { - continue - } - - let name = match get_file_name_from_path(path_str) { - Some(f) => f, - None => continue - }; - - let db = match Database::open(path.clone()) { - Ok(db) => db, - Err(_) => continue - }; - - self.databases.insert_new(name.to_string(), db); - info!("Load database {}", name); - } - } - - pub fn create_or_load(&self, name: String, schema: Schema) -> Result<(), DatabaseError> { - match self.create(name.clone(), schema) { - Err(DatabaseError::AlreadyExist) => self.load(name), - x => x, - } - } - - pub fn get(&self, name: String) -> Result, Box> { - Ok(self.databases.get(&name).ok_or(DatabaseError::NotFound(name))?) - } -} - -fn get_extension_from_path(path: &str) -> Option<&str> { - Path::new(path).extension().and_then(OsStr::to_str) -} - -fn get_file_name_from_path(path: &str) -> Option<&str> { - Path::new(path).file_stem().and_then(OsStr::to_str) -} - -fn retrieve_stop_words(path: &Path) -> io::Result> { - let f = File::open(path)?; - let reader = BufReader::new(f); - let mut words = HashSet::new(); - - for line in reader.lines() { - let line = line?; - let word = line.trim().to_string(); - words.insert(word); - } - - Ok(words) -} - -// -// PARAMS & BODY FOR HTTPS HANDLERS -// - -#[derive(Deserialize)] -struct CreateBody { - name: String, - schema: SchemaBuilder, -} - -#[derive(Deserialize)] -struct IngestBody { - insert: Option>>, - delete: Option>> -} - -#[derive(Serialize)] -struct IngestResponse { - inserted: usize, - deleted: usize -} - -#[derive(Deserialize)] -struct SearchQuery { - q: String, - limit: Option, -} - -// -// HTTP ROUTES -// - -// Create a new index. -// The index name should be unused and the schema valid. -// -// POST /create -// Body: -// - name: String -// - schema: JSON -// - stopwords: Vec -fn create(body: CreateBody, db: Arc) -> Result { - let schema = body.schema.build(); - - match db.create(body.name.clone(), schema) { - Ok(_) => Ok(format!("{} created ", body.name)), - Err(e) => { - error!("{:?}", e); - return Err(warp::reject::not_found()) - } - } -} - -// Ingest new document. -// It's possible to have positive or/and negative updates. -// -// PUT /:name/ingest -// Body: -// - insert: Option> -// - delete: Option> -fn ingest(index_name: String, body: IngestBody, db: Arc) -> Result { - - let schema = { - let index = match db.get(index_name.clone()){ - Ok(i) => i, - Err(_) => return Err(warp::reject::not_found()), - }; - let view = index.view(); - - view.schema().clone() - }; - - let tokenizer_builder = DefaultBuilder::new(); - let now = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { - Ok(n) => n.as_secs(), - Err(_) => panic!("SystemTime before UNIX EPOCH!"), - }; - - let sst_name = format!("update-{}-{}.sst", index_name, now); - let sst_path = db.db_path.join(sst_name); - - let mut response = IngestResponse{inserted: 0, deleted: 0}; - let mut update = UpdateBuilder::new(sst_path, schema); - - if let Some(documents) = body.delete { - for doc in documents { - if let Err(e) = update.remove_document(doc) { - error!("Impossible to remove document; {:?}", e); - } else { - response.deleted += 1; - } - } - } - - let stop_words = &db.stop_words; - if let Some(documents) = body.insert { - for doc in documents { - if let Err(e) = update.update_document(doc, &tokenizer_builder, &stop_words) { - error!("Impossible to update document; {:?}", e); - } else { - response.inserted += 1; - } - } - } - - - let update = match update.build() { - Ok(u) => u, - Err(e) => { - error!("Impossible to create an update file; {:?}", e); - return Err(warp::reject::not_found()) - } - }; - - { - let index = match db.get(index_name.clone()){ - Ok(i) => i, - Err(_) => return Err(warp::reject::not_found()), - }; - - if let Err(e) = index.ingest_update_file(update) { - error!("Impossible to ingest sst file; {:?}", e); - return Err(warp::reject::not_found()) - }; - } - - if let Ok(response) = serde_json::to_string(&response) { - return Ok(response); - }; - - return Err(warp::reject::not_found()) -} - -// Search in a specific index -// The default limit is 20 -// -// GET /:name/search -// Params: -// - query: String -// - limit: Option -fn search(index_name: String, query: SearchQuery, db: Arc) -> Result { - - let view = { - let index = match db.get(index_name.clone()){ - Ok(i) => i, - Err(_) => return Err(warp::reject::not_found()), - }; - index.view() - }; - - let limit = query.limit.unwrap_or(20); - - let query_builder = match view.query_builder() { - Ok(q) => q, - Err(_err) => return Err(warp::reject::not_found()), - }; - - let (time, responses) = measure_time(|| { - let docs = query_builder.query(&query.q, 0..limit); - let mut results: Vec> = Vec::with_capacity(limit); - for doc in docs { - match view.document_by_id(doc.id) { - Ok(val) => results.push(val), - Err(e) => println!("{:?}", e), - } - } - results - }); - - let response = match serde_json::to_string(&responses) { - Ok(val) => val, - Err(err) => format!("{:?}", err), - }; - - info!("index: {} - search: {:?} - limit: {} - time: {}", index_name, query.q, limit, time); - Ok(response) -} - -fn start_server(listen_addr: SocketAddr, db: Arc) { - let index_path = warp::path("index").and(warp::path::param::()); - let db = warp::any().map(move || db.clone()); - - let create_path = warp::path("create").and(warp::path::end()); - let ingest_path = index_path.and(warp::path("ingest")).and(warp::path::end()); - let search_path = index_path.and(warp::path("search")).and(warp::path::end()); - - let create = warp::post2() - .and(create_path) - .and(warp::body::json()) - .and(db.clone()) - .and_then(create); - - let ingest = warp::put2() - .and(ingest_path) - .and(warp::body::json()) - .and(db.clone()) - .and_then(ingest); - - let search = warp::get2() - .and(search_path) - .and(warp::query()) - .and(db.clone()) - .and_then(search); - - let api = create - .or(ingest) - .or(search); - - let logs = warp::log("server"); - let headers = warp::reply::with::header("Content-Type", "application/json"); - - let routes = api.with(logs).with(headers); - - info!("Server is started on {}", listen_addr); - warp::serve(routes).run(listen_addr); -} - -fn main() { - env_logger::init(); - let opt = Opt::from_args(); - - let stop_words = match retrieve_stop_words(&opt.stop_words) { - Ok(s) => s, - Err(_) => HashSet::new(), - }; - - let db = Arc::new(MultiDatabase::new(opt.database_path.clone(), stop_words)); - - db.load_existing(); - - start_server(opt.listen_addr, db); -} - - diff --git a/src/database/document_key.rs b/src/database/document_key.rs index 77f2ee43f..52fd428f8 100644 --- a/src/database/document_key.rs +++ b/src/database/document_key.rs @@ -38,6 +38,10 @@ impl DocumentKey { DocumentKeyAttr::new(self.document_id(), attr) } + pub fn with_attribute_min(&self) -> DocumentKeyAttr { + DocumentKeyAttr::new(self.document_id(), SchemaAttr::min()) + } + pub fn with_attribute_max(&self) -> DocumentKeyAttr { DocumentKeyAttr::new(self.document_id(), SchemaAttr::max()) } diff --git a/src/database/mod.rs b/src/database/mod.rs index 4bfaeaa4a..f9fb0af04 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,17 +1,17 @@ -use std::sync::{Arc, Mutex}; use std::error::Error; -use std::ops::Deref; use std::path::Path; +use std::ops::Deref; +use std::sync::Arc; -use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions}; +use rocksdb::rocksdb_options::{DBOptions, ColumnFamilyOptions}; use rocksdb::rocksdb::{Writable, Snapshot}; -use rocksdb::{DB, DBVector, MergeOperands}; +use rocksdb::{DB, MergeOperands}; use crossbeam::atomic::ArcCell; use log::info; pub use self::document_key::{DocumentKey, DocumentKeyAttr}; pub use self::view::{DatabaseView, DocumentIter}; -pub use self::update::{Update, UpdateBuilder}; +pub use self::update::Update; pub use self::serde::SerializerError; pub use self::schema::Schema; pub use self::index::Index; @@ -78,11 +78,7 @@ fn merge_indexes(key: &[u8], existing: Option<&[u8]>, operands: &mut MergeOperan } pub struct Database { - // DB is under a Mutex to sync update ingestions and separate DB update locking - // and DatabaseView acquiring locking in other words: - // "Block readers the minimum possible amount of time" - db: Mutex>, - + db: Arc, // This view is updated each time the DB ingests an update view: ArcCell>>, } @@ -113,7 +109,7 @@ impl Database { let snapshot = Snapshot::new(db.clone()); let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?)); - Ok(Database { db: Mutex::new(db), view }) + Ok(Database { db, view }) } pub fn open>(path: P) -> Result> { @@ -137,49 +133,16 @@ impl Database { let snapshot = Snapshot::new(db.clone()); let view = ArcCell::new(Arc::new(DatabaseView::new(snapshot)?)); - Ok(Database { db: Mutex::new(db), view }) + Ok(Database { db, view }) } - pub fn ingest_update_file(&self, update: Update) -> Result>>, Box> { - let snapshot = { - // We must have a mutex here to ensure that update ingestions and compactions - // are done atomatically and in the right order. - // This way update ingestions will block other update ingestions without blocking view - // creations while doing the "data-index" compaction - let db = match self.db.lock() { - Ok(db) => db, - Err(e) => return Err(e.to_string().into()), - }; - - let path = update.path().to_string_lossy(); - let options = IngestExternalFileOptions::new(); - // options.move_files(move_update); - - let (elapsed, result) = elapsed::measure_time(|| { - let cf_handle = db.cf_handle("default").expect("\"default\" column family not found"); - db.ingest_external_file_optimized(&cf_handle, &options, &[&path]) - }); - let _ = result?; - info!("ingesting update file took {}", elapsed); - - Snapshot::new(db.clone()) + pub fn update(&self) -> Result> { + let schema = match self.db.get(DATA_SCHEMA)? { + Some(value) => Schema::read_from_bin(&*value)?, + None => panic!("Database does not contain a schema"), }; - let view = Arc::new(DatabaseView::new(snapshot)?); - self.view.set(view.clone()); - - Ok(view) - } - - pub fn get(&self, key: &[u8]) -> Result, Box> { - self.view().get(key) - } - - pub fn flush(&self) -> Result<(), Box> { - match self.db.lock() { - Ok(db) => Ok(db.flush(true)?), - Err(e) => Err(e.to_string().into()), - } + Ok(Update::new(self, schema)) } pub fn view(&self) -> Arc>> { @@ -193,20 +156,18 @@ mod tests { use std::error::Error; use serde_derive::{Serialize, Deserialize}; - use tempfile::tempdir; use crate::database::schema::{SchemaBuilder, STORED, INDEXED}; - use crate::database::update::UpdateBuilder; use crate::tokenizer::DefaultBuilder; use super::*; #[test] - fn ingest_one_update_file() -> Result<(), Box> { - let dir = tempdir()?; + fn ingest_one_easy_update() -> Result<(), Box> { + let dir = tempfile::tempdir()?; let stop_words = HashSet::new(); - let rocksdb_path = dir.path().join("rocksdb.rdb"); + let meilidb_path = dir.path().join("meilidb.mdb"); #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] struct SimpleDoc { @@ -225,9 +186,7 @@ mod tests { builder.build() }; - let database = Database::create(&rocksdb_path, &schema)?; - - let update_path = dir.path().join("update.sst"); + let database = Database::create(&meilidb_path, &schema)?; let doc0 = SimpleDoc { id: 0, @@ -242,20 +201,13 @@ mod tests { timestamp: 7654321, }; - let docid0; - let docid1; - let update = { - let tokenizer_builder = DefaultBuilder::new(); - let mut builder = UpdateBuilder::new(update_path, schema); + let tokenizer_builder = DefaultBuilder::new(); + let mut builder = database.update()?; - docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?; - docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?; + let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?; + let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?; - builder.build()? - }; - - database.ingest_update_file(update)?; - let view = database.view(); + let view = builder.commit()?; let de_doc0: SimpleDoc = view.document_by_id(docid0)?; let de_doc1: SimpleDoc = view.document_by_id(docid1)?; @@ -267,11 +219,11 @@ mod tests { } #[test] - fn ingest_two_update_files() -> Result<(), Box> { - let dir = tempdir()?; + fn ingest_two_easy_updates() -> Result<(), Box> { + let dir = tempfile::tempdir()?; let stop_words = HashSet::new(); - let rocksdb_path = dir.path().join("rocksdb.rdb"); + let meilidb_path = dir.path().join("meilidb.mdb"); #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] struct SimpleDoc { @@ -290,7 +242,7 @@ mod tests { builder.build() }; - let database = Database::create(&rocksdb_path, &schema)?; + let database = Database::create(&meilidb_path, &schema)?; let doc0 = SimpleDoc { id: 0, @@ -317,36 +269,17 @@ mod tests { timestamp: 7654321, }; - let docid0; - let docid1; - let update1 = { - let tokenizer_builder = DefaultBuilder::new(); - let update_path = dir.path().join("update-000.sst"); - let mut builder = UpdateBuilder::new(update_path, schema.clone()); + let tokenizer_builder = DefaultBuilder::new(); - docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?; - docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?; + let mut builder = database.update()?; + let docid0 = builder.update_document(&doc0, &tokenizer_builder, &stop_words)?; + let docid1 = builder.update_document(&doc1, &tokenizer_builder, &stop_words)?; + builder.commit()?; - builder.build()? - }; - - let docid2; - let docid3; - let update2 = { - let tokenizer_builder = DefaultBuilder::new(); - let update_path = dir.path().join("update-001.sst"); - let mut builder = UpdateBuilder::new(update_path, schema); - - docid2 = builder.update_document(&doc2, &tokenizer_builder, &stop_words)?; - docid3 = builder.update_document(&doc3, &tokenizer_builder, &stop_words)?; - - builder.build()? - }; - - database.ingest_update_file(update1)?; - database.ingest_update_file(update2)?; - - let view = database.view(); + let mut builder = database.update()?; + let docid2 = builder.update_document(&doc2, &tokenizer_builder, &stop_words)?; + let docid3 = builder.update_document(&doc3, &tokenizer_builder, &stop_words)?; + let view = builder.commit()?; let de_doc0: SimpleDoc = view.document_by_id(docid0)?; let de_doc1: SimpleDoc = view.document_by_id(docid1)?; @@ -380,7 +313,6 @@ mod bench { use rand::seq::SliceRandom; use crate::tokenizer::DefaultBuilder; - use crate::database::update::UpdateBuilder; use crate::database::schema::*; use super::*; @@ -425,9 +357,8 @@ mod bench { description: String, } - let path = dir.path().join("update-000.sst"); let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema); + let mut builder = database.update()?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..300 { @@ -439,8 +370,7 @@ mod bench { builder.update_document(&document, &tokenizer_builder, &stop_words)?; } - let update = builder.build()?; - database.ingest_update_file(update)?; + builder.commit()?; drop(database); @@ -472,9 +402,8 @@ mod bench { description: String, } - let path = dir.path().join("update-000.sst"); let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema); + let mut builder = database.update()?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..3000 { @@ -486,8 +415,7 @@ mod bench { builder.update_document(&document, &tokenizer_builder, &stop_words)?; } - let update = builder.build()?; - database.ingest_update_file(update)?; + builder.commit()?; drop(database); @@ -520,9 +448,8 @@ mod bench { description: String, } - let path = dir.path().join("update-000.sst"); let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema); + let mut builder = database.update()?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..30_000 { @@ -534,8 +461,7 @@ mod bench { builder.update_document(&document, &tokenizer_builder, &stop_words)?; } - let update = builder.build()?; - database.ingest_update_file(update)?; + builder.commit()?; drop(database); @@ -567,9 +493,8 @@ mod bench { description: String, } - let path = dir.path().join("update-000.sst"); let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema); + let mut builder = database.update()?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..300 { @@ -581,8 +506,7 @@ mod bench { builder.update_document(&document, &tokenizer_builder, &stop_words)?; } - let update = builder.build()?; - let view = database.ingest_update_file(update)?; + let view = builder.commit()?; bench.iter(|| { for q in &["a", "b", "c", "d", "e"] { @@ -614,9 +538,8 @@ mod bench { description: String, } - let path = dir.path().join("update-000.sst"); let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema); + let mut builder = database.update()?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..3000 { @@ -628,8 +551,7 @@ mod bench { builder.update_document(&document, &tokenizer_builder, &stop_words)?; } - let update = builder.build()?; - let view = database.ingest_update_file(update)?; + let view = builder.commit()?; bench.iter(|| { for q in &["a", "b", "c", "d", "e"] { @@ -662,9 +584,8 @@ mod bench { description: String, } - let path = dir.path().join("update-000.sst"); let tokenizer_builder = DefaultBuilder; - let mut builder = UpdateBuilder::new(path, schema); + let mut builder = database.update()?; let mut rng = XorShiftRng::seed_from_u64(42); for i in 0..30_000 { @@ -676,8 +597,7 @@ mod bench { builder.update_document(&document, &tokenizer_builder, &stop_words)?; } - let update = builder.build()?; - let view = database.ingest_update_file(update)?; + let view = builder.commit()?; bench.iter(|| { for q in &["a", "b", "c", "d", "e"] { diff --git a/src/database/serde/indexer_serializer.rs b/src/database/serde/indexer_serializer.rs index 8055d7856..055600b6f 100644 --- a/src/database/serde/indexer_serializer.rs +++ b/src/database/serde/indexer_serializer.rs @@ -10,15 +10,15 @@ use crate::tokenizer::TokenizerBuilder; use crate::tokenizer::Token; use crate::{DocumentId, DocIndex}; -pub struct IndexerSerializer<'a, B> { +pub struct IndexerSerializer<'a, 'b, B> { pub tokenizer_builder: &'a B, - pub update: &'a mut DocumentUpdate, + pub update: &'a mut DocumentUpdate<'b>, pub document_id: DocumentId, pub attribute: SchemaAttr, pub stop_words: &'a HashSet, } -impl<'a, B> ser::Serializer for IndexerSerializer<'a, B> +impl<'a, 'b, B> ser::Serializer for IndexerSerializer<'a, 'b, B> where B: TokenizerBuilder { type Ok = (); @@ -71,14 +71,14 @@ where B: TokenizerBuilder let char_length = length; let doc_index = DocIndex { document_id, attribute, word_index, char_index, char_length }; - self.update.insert_doc_index(word_unidecoded.into_bytes(), doc_index); + self.update.insert_doc_index(word_unidecoded.into_bytes(), doc_index)?; } let char_index = char_index as u32; let char_length = length; let doc_index = DocIndex { document_id, attribute, word_index, char_index, char_length }; - self.update.insert_doc_index(word_lower.into_bytes(), doc_index); + self.update.insert_doc_index(word_lower.into_bytes(), doc_index)?; } Ok(()) } diff --git a/src/database/serde/mod.rs b/src/database/serde/mod.rs index 44c826487..2f9415c25 100644 --- a/src/database/serde/mod.rs +++ b/src/database/serde/mod.rs @@ -56,3 +56,9 @@ impl fmt::Display for SerializerError { } impl Error for SerializerError {} + +impl From for SerializerError { + fn from(value: String) -> SerializerError { + SerializerError::Custom(value) + } +} diff --git a/src/database/serde/serializer.rs b/src/database/serde/serializer.rs index 79b1d47e1..d516be609 100644 --- a/src/database/serde/serializer.rs +++ b/src/database/serde/serializer.rs @@ -11,15 +11,15 @@ use crate::tokenizer::TokenizerBuilder; use crate::database::schema::Schema; use crate::DocumentId; -pub struct Serializer<'a, B> { +pub struct Serializer<'a, 'b, B> { pub schema: &'a Schema, - pub update: &'a mut DocumentUpdate, + pub update: &'a mut DocumentUpdate<'b>, pub document_id: DocumentId, pub tokenizer_builder: &'a B, pub stop_words: &'a HashSet, } -impl<'a, B> ser::Serializer for Serializer<'a, B> +impl<'a, 'b, B> ser::Serializer for Serializer<'a, 'b, B> where B: TokenizerBuilder { type Ok = (); @@ -28,8 +28,8 @@ where B: TokenizerBuilder type SerializeTuple = ser::Impossible; type SerializeTupleStruct = ser::Impossible; type SerializeTupleVariant = ser::Impossible; - type SerializeMap = MapSerializer<'a, B>; - type SerializeStruct = StructSerializer<'a, B>; + type SerializeMap = MapSerializer<'a, 'b, B>; + type SerializeStruct = StructSerializer<'a, 'b, B>; type SerializeStructVariant = ser::Impossible; forward_to_unserializable_type! { @@ -174,16 +174,16 @@ where B: TokenizerBuilder } } -pub struct MapSerializer<'a, B> { +pub struct MapSerializer<'a, 'b, B> { pub schema: &'a Schema, pub document_id: DocumentId, - pub update: &'a mut DocumentUpdate, + pub update: &'a mut DocumentUpdate<'b>, pub tokenizer_builder: &'a B, pub stop_words: &'a HashSet, pub current_key_name: Option, } -impl<'a, B> ser::SerializeMap for MapSerializer<'a, B> +impl<'a, 'b, B> ser::SerializeMap for MapSerializer<'a, 'b, B> where B: TokenizerBuilder { type Ok = (); @@ -207,7 +207,7 @@ where B: TokenizerBuilder fn serialize_entry( &mut self, key: &K, - value: &V + value: &V, ) -> Result<(), Self::Error> where K: Serialize, V: Serialize, { @@ -217,7 +217,7 @@ where B: TokenizerBuilder let props = self.schema.props(attr); if props.is_stored() { let value = bincode::serialize(value).unwrap(); - self.update.insert_attribute_value(attr, value); + self.update.insert_attribute_value(attr, &value)?; } if props.is_indexed() { let serializer = IndexerSerializer { @@ -239,15 +239,15 @@ where B: TokenizerBuilder } } -pub struct StructSerializer<'a, B> { +pub struct StructSerializer<'a, 'b, B> { pub schema: &'a Schema, pub document_id: DocumentId, - pub update: &'a mut DocumentUpdate, + pub update: &'a mut DocumentUpdate<'b>, pub tokenizer_builder: &'a B, pub stop_words: &'a HashSet, } -impl<'a, B> ser::SerializeStruct for StructSerializer<'a, B> +impl<'a, 'b, B> ser::SerializeStruct for StructSerializer<'a, 'b, B> where B: TokenizerBuilder { type Ok = (); @@ -264,7 +264,7 @@ where B: TokenizerBuilder let props = self.schema.props(attr); if props.is_stored() { let value = bincode::serialize(value).unwrap(); - self.update.insert_attribute_value(attr, value); + self.update.insert_attribute_value(attr, &value)?; } if props.is_indexed() { let serializer = IndexerSerializer { diff --git a/src/database/update.rs b/src/database/update.rs new file mode 100644 index 000000000..ec9eed0df --- /dev/null +++ b/src/database/update.rs @@ -0,0 +1,209 @@ +use std::collections::{HashSet, BTreeMap}; +use std::error::Error; +use std::sync::Arc; + +use rocksdb::rocksdb::{DB, Writable, Snapshot, WriteBatch}; +use hashbrown::hash_map::HashMap; +use serde::Serialize; +use fst::map::Map; +use sdset::Set; + +use crate::database::{DATA_INDEX, Database, DatabaseView}; +use crate::database::index::{Positive, PositiveBuilder, Negative}; +use crate::database::document_key::{DocumentKey, DocumentKeyAttr}; +use crate::database::serde::serializer::Serializer; +use crate::database::serde::SerializerError; +use crate::database::schema::SchemaAttr; +use crate::tokenizer::TokenizerBuilder; +use crate::data::{DocIds, DocIndexes}; +use crate::database::schema::Schema; +use crate::{DocumentId, DocIndex}; +use crate::database::index::Index; + +pub type Token = Vec; // TODO could be replaced by a SmallVec + +pub struct Update<'a> { + database: &'a Database, + schema: Schema, + raw_builder: RawUpdateBuilder, +} + +impl<'a> Update<'a> { + pub(crate) fn new(database: &'a Database, schema: Schema) -> Update<'a> { + Update { database, schema, raw_builder: RawUpdateBuilder::new() } + } + + pub fn update_document( + &mut self, + document: T, + tokenizer_builder: &B, + stop_words: &HashSet, + ) -> Result + where T: Serialize, + B: TokenizerBuilder, + { + let document_id = self.schema.document_id(&document)?; + + let serializer = Serializer { + schema: &self.schema, + document_id: document_id, + tokenizer_builder: tokenizer_builder, + update: &mut self.raw_builder.document_update(document_id)?, + stop_words: stop_words, + }; + + document.serialize(serializer)?; + + Ok(document_id) + } + + pub fn remove_document(&mut self, document: T) -> Result + where T: Serialize, + { + let document_id = self.schema.document_id(&document)?; + self.raw_builder.document_update(document_id)?.remove()?; + Ok(document_id) + } + + pub fn commit(self) -> Result>>, Box> { + let batch = self.raw_builder.build()?; + self.database.db.write(batch)?; + + let snapshot = Snapshot::new(self.database.db.clone()); + let view = Arc::new(DatabaseView::new(snapshot)?); + self.database.view.set(view.clone()); + + Ok(view) + } + + pub fn abort(self) { } +} + +#[derive(Copy, Clone, PartialEq, Eq)] +enum UpdateType { + Updated, + Deleted, +} + +use UpdateType::{Updated, Deleted}; + +pub struct RawUpdateBuilder { + documents_update: HashMap, + indexed_words: BTreeMap>, + batch: WriteBatch, +} + +impl RawUpdateBuilder { + pub fn new() -> RawUpdateBuilder { + RawUpdateBuilder { + documents_update: HashMap::new(), + indexed_words: BTreeMap::new(), + batch: WriteBatch::new(), + } + } + + pub fn document_update(&mut self, document_id: DocumentId) -> Result { + use serde::ser::Error; + + match self.documents_update.get(&document_id) { + Some(Deleted) | None => Ok(DocumentUpdate { document_id, inner: self }), + Some(Updated) => Err(SerializerError::custom( + "This document has already been removed and cannot be updated in the same update" + )), + } + } + + pub fn build(self) -> Result> { + let negative = { + let mut removed_document_ids = Vec::new(); + for (id, update_type) in self.documents_update { + if update_type == Deleted { + removed_document_ids.push(id); + } + } + + removed_document_ids.sort_unstable(); + let removed_document_ids = Set::new_unchecked(&removed_document_ids); + let doc_ids = DocIds::new(removed_document_ids); + + Negative::new(doc_ids) + }; + + let positive = { + let mut positive_builder = PositiveBuilder::memory(); + + for (key, mut indexes) in self.indexed_words { + indexes.sort_unstable(); + let indexes = Set::new_unchecked(&indexes); + positive_builder.insert(key, indexes)?; + } + + let (map, indexes) = positive_builder.into_inner()?; + let map = Map::from_bytes(map)?; + let indexes = DocIndexes::from_bytes(indexes)?; + + Positive::new(map, indexes) + }; + + let index = Index { negative, positive }; + + // write the data-index + let mut bytes = Vec::new(); + index.write_to_bytes(&mut bytes); + self.batch.merge(DATA_INDEX, &bytes)?; + + Ok(self.batch) + } +} + +pub struct DocumentUpdate<'a> { + document_id: DocumentId, + inner: &'a mut RawUpdateBuilder, +} + +impl<'a> DocumentUpdate<'a> { + pub fn remove(&mut self) -> Result<(), SerializerError> { + use serde::ser::Error; + + if let Updated = self.inner.documents_update.entry(self.document_id).or_insert(Deleted) { + return Err(SerializerError::custom( + "This document has already been updated and cannot be removed in the same update" + )); + } + + let start = DocumentKey::new(self.document_id).with_attribute_min(); + let end = DocumentKey::new(self.document_id).with_attribute_max(); // FIXME max + 1 + self.inner.batch.delete_range(start.as_ref(), end.as_ref())?; + + Ok(()) + } + + pub fn insert_attribute_value(&mut self, attr: SchemaAttr, value: &[u8]) -> Result<(), SerializerError> { + use serde::ser::Error; + + if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) { + return Err(SerializerError::custom( + "This document has already been deleted and cannot be updated in the same update" + )); + } + + let key = DocumentKeyAttr::new(self.document_id, attr); + self.inner.batch.put(key.as_ref(), &value)?; + + Ok(()) + } + + pub fn insert_doc_index(&mut self, token: Token, doc_index: DocIndex) -> Result<(), SerializerError> { + use serde::ser::Error; + + if let Deleted = self.inner.documents_update.entry(self.document_id).or_insert(Updated) { + return Err(SerializerError::custom( + "This document has already been deleted and cannot be updated in the same update" + )); + } + + self.inner.indexed_words.entry(token).or_insert_with(Vec::new).push(doc_index); + + Ok(()) + } +} diff --git a/src/database/update/builder.rs b/src/database/update/builder.rs deleted file mode 100644 index 931d6c56d..000000000 --- a/src/database/update/builder.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::collections::HashSet; -use std::path::PathBuf; -use std::error::Error; - -use serde::Serialize; - -use crate::database::serde::serializer::Serializer; -use crate::database::serde::SerializerError; -use crate::tokenizer::TokenizerBuilder; -use crate::database::Schema; - -use crate::DocumentId; -use super::{Update, RawUpdateBuilder}; - -pub struct UpdateBuilder { - schema: Schema, - raw_builder: RawUpdateBuilder, -} - -impl UpdateBuilder { - pub fn new(path: PathBuf, schema: Schema) -> UpdateBuilder { - UpdateBuilder { - schema: schema, - raw_builder: RawUpdateBuilder::new(path), - } - } - - pub fn update_document( - &mut self, - document: T, - tokenizer_builder: &B, - stop_words: &HashSet, - ) -> Result - where T: Serialize, - B: TokenizerBuilder, - { - let document_id = self.schema.document_id(&document)?; - let update = self.raw_builder.document_update(document_id); - - let serializer = Serializer { - schema: &self.schema, - document_id: document_id, - tokenizer_builder: tokenizer_builder, - update: update, - stop_words: stop_words, - }; - - document.serialize(serializer)?; - - Ok(document_id) - } - - pub fn remove_document(&mut self, document: T) -> Result - where T: Serialize, - { - let document_id = self.schema.document_id(&document)?; - self.raw_builder.document_update(document_id).remove(); - Ok(document_id) - } - - pub fn build(self) -> Result> { - self.raw_builder.build() - } -} diff --git a/src/database/update/mod.rs b/src/database/update/mod.rs deleted file mode 100644 index 721fc549f..000000000 --- a/src/database/update/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -use std::path::{Path, PathBuf}; - -mod builder; -mod raw_builder; - -pub use self::builder::UpdateBuilder; -pub use self::raw_builder::{RawUpdateBuilder, DocumentUpdate}; - -pub struct Update { - sst_file: PathBuf, -} - -impl Update { - pub fn path(&self) -> &Path { - &self.sst_file - } -} diff --git a/src/database/update/raw_builder.rs b/src/database/update/raw_builder.rs deleted file mode 100644 index b9ed2c347..000000000 --- a/src/database/update/raw_builder.rs +++ /dev/null @@ -1,170 +0,0 @@ -use std::collections::btree_map::{BTreeMap, Entry}; -use std::path::PathBuf; -use std::error::Error; - -use rocksdb::rocksdb_options; -use hashbrown::HashMap; -use fst::map::Map; -use sdset::Set; -use log::warn; - -use crate::database::index::{Index, Positive, PositiveBuilder, Negative}; -use crate::database::{DATA_INDEX, DocumentKeyAttr}; -use crate::database::schema::SchemaAttr; -use crate::data::{DocIds, DocIndexes}; -use crate::{DocumentId, DocIndex}; -use super::Update; - -type Token = Vec; // TODO could be replaced by a SmallVec -type Value = Vec; - -pub struct RawUpdateBuilder { - sst_file: PathBuf, - document_updates: BTreeMap, -} - -pub struct DocumentUpdate { - cleared: bool, - words_indexes: HashMap>, - attributes: BTreeMap, -} - -impl DocumentUpdate { - pub fn new() -> DocumentUpdate { - DocumentUpdate { - cleared: false, - words_indexes: HashMap::new(), - attributes: BTreeMap::new(), - } - } - - pub fn remove(&mut self) { - self.cleared = true; - self.clear(); - } - - pub fn clear(&mut self) { - self.words_indexes.clear(); - self.attributes.clear(); - } - - pub fn insert_attribute_value(&mut self, attr: SchemaAttr, value: Vec) { - self.attributes.insert(attr, value); - } - - pub fn insert_doc_index(&mut self, token: Vec, doc_index: DocIndex) { - self.words_indexes.entry(token).or_insert_with(Vec::new).push(doc_index) - } -} - -impl RawUpdateBuilder { - pub fn new(path: PathBuf) -> RawUpdateBuilder { - RawUpdateBuilder { - sst_file: path, - document_updates: BTreeMap::new(), - } - } - - pub fn document_update(&mut self, document_id: DocumentId) -> &mut DocumentUpdate { - match self.document_updates.entry(document_id) { - Entry::Occupied(mut occupied) => { - warn!("Already updated document {:?}, clearing it", document_id); - occupied.get_mut().clear(); - occupied.into_mut() - }, - Entry::Vacant(vacant) => vacant.insert(DocumentUpdate::new()), - } - } - - pub fn build(mut self) -> Result> { - let mut removed_document_ids = Vec::new(); - let mut words_indexes = BTreeMap::new(); - - for (&id, update) in self.document_updates.iter_mut() { - if update.cleared { removed_document_ids.push(id) } - - for (token, indexes) in &update.words_indexes { - words_indexes.entry(token).or_insert_with(Vec::new).extend_from_slice(indexes) - } - } - - let negative = { - let removed_document_ids = Set::new_unchecked(&removed_document_ids); - let doc_ids = DocIds::new(removed_document_ids); - Negative::new(doc_ids) - }; - - let positive = { - let mut positive_builder = PositiveBuilder::memory(); - - for (key, mut indexes) in words_indexes { - indexes.sort_unstable(); - let indexes = Set::new_unchecked(&indexes); - positive_builder.insert(key, indexes)?; - } - - let (map, indexes) = positive_builder.into_inner()?; - let map = Map::from_bytes(map)?; - let indexes = DocIndexes::from_bytes(indexes)?; - Positive::new(map, indexes) - }; - - let index = Index { negative, positive }; - - let env_options = rocksdb_options::EnvOptions::new(); - let column_family_options = rocksdb_options::ColumnFamilyOptions::new(); - let mut file_writer = rocksdb::SstFileWriter::new(env_options, column_family_options); - file_writer.open(&self.sst_file.to_string_lossy())?; - - // write the data-index - let mut bytes = Vec::new(); - index.write_to_bytes(&mut bytes); - file_writer.merge(DATA_INDEX, &bytes)?; - - // write all the documents attributes updates - for (id, update) in self.document_updates { - - let mut last_attr: Option = None; - for (attr, value) in update.attributes { - - if update.cleared { - // if there is no last attribute, remove from the first attribute - let start_attr = match last_attr { - Some(attr) => attr.next(), - None => Some(SchemaAttr::min()) - }; - let start = start_attr.map(|a| DocumentKeyAttr::new(id, a)); - let end = attr.prev().map(|a| DocumentKeyAttr::new(id, a)); - - // delete_range between (last_attr + 1) and (attr - 1) - if let (Some(start), Some(end)) = (start, end) { - file_writer.delete_range(start.as_ref(), end.as_ref())?; - } - } - - let key = DocumentKeyAttr::new(id, attr); - file_writer.put(key.as_ref(), &value)?; - last_attr = Some(attr); - } - - if update.cleared { - // if there is no last attribute, remove from the first attribute - let start_attr = match last_attr { - Some(attr) => attr.next(), - None => Some(SchemaAttr::min()) - }; - let start = start_attr.map(|a| DocumentKeyAttr::new(id, a)); - let end = DocumentKeyAttr::with_attribute_max(id); - - // delete_range between (last_attr + 1) and attr_max - if let Some(start) = start { - file_writer.delete_range(start.as_ref(), end.as_ref())?; - } - } - } - - file_writer.finish()?; - - Ok(Update { sst_file: self.sst_file }) - } -}