use meta from milli

This commit is contained in:
mpostma 2021-03-11 19:40:18 +01:00
parent 1fad72e019
commit 79a4bc8129
No known key found for this signature in database
GPG key ID: CBC8A7C1D7A28C3A
5 changed files with 207 additions and 210 deletions

View file

@ -38,7 +38,8 @@ main_error = "0.1.0"
meilisearch-error = { path = "../meilisearch-error" }
meilisearch-tokenizer = { git = "https://github.com/meilisearch/Tokenizer.git", branch = "main" }
memmap = "0.7.0"
milli = { git = "https://github.com/meilisearch/milli.git", rev = "794fce7" }
milli = { path = "../../milli/milli" }
#milli = { git = "https://github.com/meilisearch/milli.git", rev = "794fce7" }
mime = "0.3.16"
once_cell = "1.5.2"
rand = "0.7.3"

View file

@ -6,7 +6,7 @@ use either::Either;
use anyhow::bail;
use heed::RoTxn;
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use milli::{FacetCondition, facet::FacetValue};
use milli::{FacetCondition, MatchingWords, facet::FacetValue};
use serde::{Serialize, Deserialize};
use serde_json::{Value, Map};
@ -71,7 +71,7 @@ impl Index {
let milli::SearchResult {
documents_ids,
found_words,
matching_words,
candidates,
..
} = search.execute()?;
@ -102,7 +102,7 @@ impl Index {
for (_id, obkv) in self.documents(&rtxn, documents_ids)? {
let mut object = milli::obkv_to_json(&displayed_fields_ids, &fields_ids_map, obkv).unwrap();
if let Some(ref attributes_to_highlight) = query.attributes_to_highlight {
highlighter.highlight_record(&mut object, &found_words, attributes_to_highlight);
highlighter.highlight_record(&mut object, &matching_words, attributes_to_highlight);
}
documents.push(object);
}
@ -173,7 +173,7 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
Self { analyzer }
}
pub fn highlight_value(&self, value: Value, words_to_highlight: &HashSet<String>) -> Value {
pub fn highlight_value(&self, value: Value, words_to_highlight: &MatchingWords) -> Value {
match value {
Value::Null => Value::Null,
Value::Bool(boolean) => Value::Bool(boolean),
@ -183,7 +183,7 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
let analyzed = self.analyzer.analyze(&old_string);
for (word, token) in analyzed.reconstruct() {
if token.is_word() {
let to_highlight = words_to_highlight.contains(token.text());
let to_highlight = words_to_highlight.matches(token.text());
if to_highlight {
string.push_str("<mark>")
}
@ -215,7 +215,7 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
pub fn highlight_record(
&self,
object: &mut Map<String, Value>,
words_to_highlight: &HashSet<String>,
words_to_highlight: &MatchingWords,
attributes_to_highlight: &HashSet<String>,
) {
// TODO do we need to create a string for element that are not and needs to be highlight?

View file

@ -12,7 +12,7 @@ use super::Index;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult {
DocumentsAddition(DocumentAdditionResult),
DocumentDeletion { deleted: usize },
DocumentDeletion { deleted: u64 },
Other,
}

View file

@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::fs::{create_dir_all, remove_dir_all, File};
use std::fs::{create_dir_all, File};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@ -8,15 +8,13 @@ use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::pin_mut;
use futures::stream::StreamExt;
use heed::{
types::{ByteSlice, SerdeBincode},
Database, Env, EnvOpenOptions,
};
use heed::EnvOpenOptions;
use log::debug;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::{sync::{mpsc, oneshot, RwLock}};
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::task::spawn_blocking;
use tokio::fs::remove_dir_all;
use uuid::Uuid;
use super::get_arc_ownership_blocking;
@ -36,12 +34,21 @@ type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<U
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
uuid: Uuid,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
primary_key: Option<String>,
}
impl IndexMeta {
fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
let created_at = index.created_at(&txn)?;
let updated_at = index.updated_at(&txn)?;
let primary_key = index.primary_key(&txn)?.map(String::from);
Ok(Self { primary_key, updated_at, created_at })
}
}
enum IndexMsg {
CreateIndex {
uuid: Uuid,
@ -106,14 +113,9 @@ pub enum IndexError {
#[async_trait::async_trait]
trait IndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>;
async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R>
where
F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
R: Sync + Send + 'static;
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>>;
}
impl<S: IndexStore + Sync + Send> IndexActor<S> {
@ -245,7 +247,11 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
uuid: Uuid,
primary_key: Option<String>,
) -> Result<IndexMeta> {
self.store.create_index(uuid, primary_key).await
let index = self.store.create(uuid, primary_key).await?;
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
Ok(meta)
}
async fn handle_update(
@ -256,16 +262,13 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
debug!("Processing update {}", meta.id());
let uuid = meta.index_uuid().clone();
let update_handler = self.update_handler.clone();
let handle = self
.store
.update_index(uuid, |index| {
let handle =
spawn_blocking(move || update_handler.handle_update(meta, data, index));
Ok(handle)
})
.await?;
handle.await.map_err(|e| IndexError::Error(e.into()))
let index = match self.store.get(uuid.clone()).await? {
Some(index) => index,
None => self.store.create(uuid, None).await?,
};
spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await
.map_err(|e| IndexError::Error(e.into()))
}
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
@ -338,8 +341,15 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
}
async fn handle_get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> {
let result = self.store.get_meta(uuid).await?;
Ok(result)
match self.store.get(uuid).await? {
Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
Ok(Some(meta))
}
None => Ok(None),
}
}
}
@ -451,24 +461,15 @@ impl IndexActorHandle {
}
struct HeedIndexStore {
env: Env,
db: Database<ByteSlice, SerdeBincode<IndexMeta>>,
index_store: AsyncMap<Uuid, Index>,
path: PathBuf,
}
impl HeedIndexStore {
fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let mut options = EnvOpenOptions::new();
options.map_size(1_073_741_824); //1GB
let path = path.as_ref().join("indexes/");
create_dir_all(&path)?;
let env = options.open(&path)?;
let db = env.create_database(None)?;
let index_store = Arc::new(RwLock::new(HashMap::new()));
Ok(Self {
env,
db,
index_store,
path,
})
@ -477,76 +478,22 @@ impl HeedIndexStore {
#[async_trait::async_trait]
impl IndexStore for HeedIndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
let path = self.path.join(format!("index-{}", uuid));
if path.exists() {
return Err(IndexError::IndexAlreadyExists);
}
let env = self.env.clone();
let db = self.db.clone();
let (index, meta) = spawn_blocking(move || -> Result<(Index, IndexMeta)> {
let now = Utc::now();
let meta = IndexMeta {
uuid: uuid.clone(),
created_at: now.clone(),
updated_at: now,
primary_key,
};
let mut txn = env.write_txn()?;
db.put(&mut txn, uuid.as_bytes(), &meta)?;
txn.commit()?;
let index = spawn_blocking(move || -> Result<Index> {
let index = open_index(&path, 4096 * 100_000)?;
Ok((index, meta))
Ok(index)
})
.await
.expect("thread died")?;
.map_err(|e| IndexError::Error(e.into()))??;
self.index_store.write().await.insert(uuid.clone(), index);
self.index_store.write().await.insert(uuid.clone(), index.clone());
Ok(meta)
}
async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R>
where
F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
R: Sync + Send + 'static,
{
let guard = self.index_store.read().await;
let index = match guard.get(&uuid) {
Some(index) => index.clone(),
None => {
drop(guard);
self.create_index(uuid.clone(), None).await?;
self.index_store
.read()
.await
.get(&uuid)
.expect("Index should exist")
.clone()
}
};
let env = self.env.clone();
let db = self.db.clone();
spawn_blocking(move || {
let mut txn = env.write_txn()?;
let mut meta = db.get(&txn, uuid.as_bytes())?.expect("unexisting index");
match f(index) {
Ok(r) => {
meta.updated_at = Utc::now();
db.put(&mut txn, uuid.as_bytes(), &meta)?;
txn.commit()?;
Ok(r)
}
Err(e) => Err(e),
}
})
.await
.expect("thread died")
Ok(index)
}
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
@ -561,54 +508,33 @@ impl IndexStore for HeedIndexStore {
return Ok(None);
}
// TODO: set this info from the database
let index = spawn_blocking(|| open_index(path, 4096 * 100_000))
.await
.expect("thread died")?;
.map_err(|e| IndexError::Error(e.into()))??;
self.index_store
.write()
.await
.insert(uuid.clone(), index.clone());
println!("here");
Ok(Some(index))
}
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
let env = self.env.clone();
let db = self.db.clone();
let db_path = self.path.join(format!("index-{}", uuid));
spawn_blocking(move || -> Result<()> {
let mut txn = env.write_txn()?;
db.delete(&mut txn, uuid.as_bytes())?;
txn.commit()?;
remove_dir_all(db_path).unwrap();
Ok(())
})
.await
.expect("thread died")?;
remove_dir_all(db_path).await
.map_err(|e| IndexError::Error(e.into()))?;
let index = self.index_store.write().await.remove(&uuid);
Ok(index)
}
async fn get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> {
let env = self.env.clone();
let db = self.db.clone();
spawn_blocking(move || {
let txn = env.read_txn()?;
let meta = db.get(&txn, uuid.as_bytes())?;
Ok(meta)
})
.await
.expect("thread died")
}
}
fn open_index(path: impl AsRef<Path>, size: usize) -> Result<Index> {
create_dir_all(&path).expect("can't create db");
create_dir_all(&path)
.map_err(|e| IndexError::Error(e.into()))?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &path).map_err(|e| IndexError::Error(e))?;
let index = milli::Index::new(options, &path)
.map_err(|e| IndexError::Error(e))?;
Ok(Index(Arc::new(index)))
}