[WIP] rebase on main

This commit is contained in:
tamo 2021-05-10 20:23:12 +02:00
parent 0fee81678e
commit 1b5fc61eb6
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
6 changed files with 164 additions and 137 deletions

View File

@ -8,7 +8,7 @@ use serde_json::{Map, Value};
use crate::helpers::EnvSizer; use crate::helpers::EnvSizer;
pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{Facets, Settings, Checked, Unchecked}; pub use updates::{Facets, Settings, Checked, Unchecked, UpdateResult};
use serde::{de::Deserializer, Deserialize}; use serde::{de::Deserializer, Deserialize};
mod search; mod search;
@ -35,12 +35,13 @@ where
Deserialize::deserialize(deserializer).map(Some) Deserialize::deserialize(deserializer).map(Some)
} }
pub fn deserialize_wildcard<'de, D>(deserializer: D) -> Result<Option<Option<Vec<String>>>, D::Error> pub fn deserialize_wildcard<'de, I, D>(deserializer: D) -> Result<Option<Option<I>>, D::Error>
where where
D: Deserializer<'de>, D: Deserializer<'de>,
I: IntoIterator<Item = String> + Deserialize<'de> + Clone,
{ {
Ok(<Option<Vec<String>> as Deserialize>::deserialize(deserializer)? Ok(<Option<I> as Deserialize>::deserialize(deserializer)?
.map(|item: Vec<String>| (!item.iter().any(|s| s == "*")).then(|| item))) .map(|item: I| (!item.clone().into_iter().any(|s| s == "*")).then(|| item)))
} }
impl Index { impl Index {

View File

@ -7,7 +7,7 @@ use anyhow::bail;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::{error, info}; use log::{error, info};
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use serde::{de::Deserializer, Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tempfile::TempDir; use tempfile::TempDir;
use tokio::fs; use tokio::fs;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
@ -159,7 +159,7 @@ fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()>
Ok(()) Ok(())
} }
pub async fn load_dump( pub fn load_dump(
db_path: impl AsRef<Path>, db_path: impl AsRef<Path>,
dump_path: impl AsRef<Path>, dump_path: impl AsRef<Path>,
size: usize, size: usize,
@ -167,7 +167,7 @@ pub async fn load_dump(
info!("Importing dump from {}...", dump_path.as_ref().display()); info!("Importing dump from {}...", dump_path.as_ref().display());
let db_path = db_path.as_ref(); let db_path = db_path.as_ref();
let dump_path = dump_path.as_ref(); let dump_path = dump_path.as_ref();
let uuid_resolver = uuid_resolver::UuidResolverHandleImpl::new(&db_path)?; let uuid_resolver = uuid_resolver::HeedUuidStore::new(&db_path)?;
// extract the dump in a temporary directory // extract the dump in a temporary directory
let tmp_dir = TempDir::new()?; let tmp_dir = TempDir::new()?;
@ -178,7 +178,7 @@ pub async fn load_dump(
let metadata = DumpMetadata::from_path(&tmp_dir_path)?; let metadata = DumpMetadata::from_path(&tmp_dir_path)?;
// remove indexes which have same `uuid` than indexes to import and create empty indexes // remove indexes which have same `uuid` than indexes to import and create empty indexes
let existing_index_uids = uuid_resolver.list().await?; let existing_index_uids = uuid_resolver.list()?;
info!("Deleting indexes already present in the db and provided in the dump..."); info!("Deleting indexes already present in the db and provided in the dump...");
for idx in &metadata.indexes { for idx in &metadata.indexes {
@ -197,14 +197,15 @@ pub async fn load_dump(
} }
} else { } else {
// if the index does not exist in the `uuid_resolver` we create it // if the index does not exist in the `uuid_resolver` we create it
uuid_resolver.create(idx.uid.clone()).await?; uuid_resolver.create_uuid(idx.uid.clone(), false)?;
} }
} }
// import each indexes content // import each indexes content
for idx in metadata.indexes { for idx in metadata.indexes {
let dump_path = tmp_dir_path.join(&idx.uid); let dump_path = tmp_dir_path.join(&idx.uid);
let uuid = uuid_resolver.get(idx.uid).await?; // this cannot fail since we created all the missing uuid in the previous loop
let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap();
let index_path = db_path.join(&format!("indexes/index-{}", uuid)); let index_path = db_path.join(&format!("indexes/index-{}", uuid));
let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db

View File

@ -17,7 +17,7 @@ struct Settings {
#[serde(default, deserialize_with = "deserialize_wildcard")] #[serde(default, deserialize_with = "deserialize_wildcard")]
pub searchable_attributes: Option<Option<Vec<String>>>, pub searchable_attributes: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_wildcard")] #[serde(default, deserialize_with = "deserialize_wildcard")]
pub displayed_attributes: Option<Option<Vec<String>>>, pub displayed_attributes: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")] #[serde(default, deserialize_with = "deserialize_some")]
pub stop_words: Option<Option<BTreeSet<String>>>, pub stop_words: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")] #[serde(default, deserialize_with = "deserialize_some")]
@ -92,8 +92,13 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow:
// extract `settings.json` file and import content // extract `settings.json` file and import content
let settings = import_settings(&dump_path)?; let settings = import_settings(&dump_path)?;
dbg!(&settings); dbg!(&settings);
let settings = settings.into(); let mut settings: index_controller::Settings = settings.into();
dbg!(&settings); if settings.displayed_attributes.as_ref().map_or(false, |o| o.as_ref().map_or(false, |v| v.contains(&String::from("*")))) {
settings.displayed_attributes = None;
}
if settings.searchable_attributes.as_ref().map_or(false, |o| o.as_ref().map_or(false, |v| v.contains(&String::from("*")))) {
settings.searchable_attributes = None;
}
let update_builder = UpdateBuilder::new(0); let update_builder = UpdateBuilder::new(0);
index.update_settings(&settings, update_builder)?; index.update_settings(&settings, update_builder)?;

View File

@ -92,7 +92,7 @@ impl IndexController {
&options.db_path, &options.db_path,
path, path,
index_size, index_size,
).await?; )?;
} }

View File

@ -11,11 +11,12 @@ use uuid::Uuid;
use actor::UuidResolverActor; use actor::UuidResolverActor;
use message::UuidResolveMsg; use message::UuidResolveMsg;
use store::{HeedUuidStore, UuidStore}; use store::UuidStore;
#[cfg(test)] #[cfg(test)]
use mockall::automock; use mockall::automock;
pub use store::HeedUuidStore;
pub use handle_impl::UuidResolverHandleImpl; pub use handle_impl::UuidResolverHandleImpl;
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB

View File

@ -1,6 +1,6 @@
use std::path::{Path, PathBuf};
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::create_dir_all; use std::fs::create_dir_all;
use std::path::{Path, PathBuf};
use heed::{ use heed::{
types::{ByteSlice, Str}, types::{ByteSlice, Str},
@ -25,6 +25,7 @@ pub trait UuidStore {
async fn get_size(&self) -> Result<u64>; async fn get_size(&self) -> Result<u64>;
} }
#[derive(Clone)]
pub struct HeedUuidStore { pub struct HeedUuidStore {
env: Env, env: Env,
db: Database<Str, ByteSlice>, db: Database<Str, ByteSlice>,
@ -40,150 +41,168 @@ impl HeedUuidStore {
let db = env.create_database(None)?; let db = env.create_database(None)?;
Ok(Self { env, db }) Ok(Self { env, db })
} }
pub fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
}
}
None => {
let uuid = Uuid::new_v4();
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(uuid)
}
}
}
pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
Ok(Some(uuid))
}
None => Ok(None),
}
}
pub fn delete(&self, uid: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
match db.get(&txn, &uid)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
db.delete(&mut txn, &uid)?;
txn.commit()?;
Ok(Some(uuid))
}
None => Ok(None),
}
}
pub fn list(&self) -> Result<Vec<(String, Uuid)>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (name, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push((name.to_owned(), uuid))
}
Ok(entries)
}
pub fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(())
}
// TODO: we should merge this function and the following function for the dump. it's exactly
// the same code
pub fn snapshot(&self, mut path: PathBuf) -> Result<Vec<Uuid>> {
let env = self.env.clone();
let db = self.db;
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = HashSet::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.insert(uuid);
}
// only perform snapshot if there are indexes
if !entries.is_empty() {
path.push("index_uuids");
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
}
pub fn dump(&self, mut path: PathBuf) -> Result<Vec<Uuid>> {
let env = self.env.clone();
let db = self.db;
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push(uuid)
}
// only perform dump if there are indexes
if !entries.is_empty() {
path.push("index_uuids");
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
}
pub fn get_size(&self) -> Result<u64> {
Ok(self.env.size())
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl UuidStore for HeedUuidStore { impl UuidStore for HeedUuidStore {
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> { async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone(); let this = self.clone();
let db = self.db; tokio::task::spawn_blocking(move || this.create_uuid(name, err)).await?
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
}
}
None => {
let uuid = Uuid::new_v4();
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(uuid)
}
}
})
.await?
} }
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> { async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone(); let this = self.clone();
let db = self.db; tokio::task::spawn_blocking(move || this.get_uuid(name)).await?
tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
Ok(Some(uuid))
}
None => Ok(None),
}
})
.await?
} }
async fn delete(&self, uid: String) -> Result<Option<Uuid>> { async fn delete(&self, uid: String) -> Result<Option<Uuid>> {
let env = self.env.clone(); let this = self.clone();
let db = self.db; tokio::task::spawn_blocking(move || this.delete(uid)).await?
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
match db.get(&txn, &uid)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
db.delete(&mut txn, &uid)?;
txn.commit()?;
Ok(Some(uuid))
}
None => Ok(None),
}
})
.await?
} }
async fn list(&self) -> Result<Vec<(String, Uuid)>> { async fn list(&self) -> Result<Vec<(String, Uuid)>> {
let env = self.env.clone(); let this = self.clone();
let db = self.db; tokio::task::spawn_blocking(move || this.list()).await?
tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (name, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push((name.to_owned(), uuid))
}
Ok(entries)
})
.await?
} }
async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { async fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let env = self.env.clone(); let this = self.clone();
let db = self.db; tokio::task::spawn_blocking(move || this.insert(name, uuid)).await?
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(())
})
.await?
} }
// TODO: we should merge this function and the following function for the dump. it's exactly async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>> {
// the same code let this = self.clone();
async fn snapshot(&self, mut path: PathBuf) -> Result<HashSet<Uuid>> { tokio::task::spawn_blocking(move || this.snapshot(path)).await?
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = HashSet::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.insert(uuid);
}
// only perform snapshot if there are indexes
if !entries.is_empty() {
path.push("index_uuids");
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
})
.await?
} }
async fn dump(&self, mut path: PathBuf) -> Result<Vec<Uuid>> { async fn dump(&self, path: PathBuf) -> Result<Vec<Uuid>> {
let env = self.env.clone(); let this = self.clone();
let db = self.db; tokio::task::spawn_blocking(move || this.dump(path)).await?
tokio::task::spawn_blocking(move || {
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push(uuid)
}
// only perform dump if there are indexes
if !entries.is_empty() {
path.push("index_uuids");
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
})
.await?
} }
async fn get_size(&self) -> Result<u64> { async fn get_size(&self) -> Result<u64> {
Ok(self.env.size()) self.get_size()
} }
} }