mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-12 22:37:29 +01:00
335 lines
12 KiB
Rust
335 lines
12 KiB
Rust
#![allow(dead_code)]
|
|
|
|
use std::fs::{create_dir_all, remove_dir_all, File};
|
|
use std::io::{self, BufReader, BufWriter, Read};
|
|
use std::path::Path;
|
|
use std::str::FromStr as _;
|
|
|
|
use anyhow::Context;
|
|
use bumpalo::Bump;
|
|
use criterion::BenchmarkId;
|
|
use memmap2::Mmap;
|
|
use milli::heed::EnvOpenOptions;
|
|
use milli::update::new::indexer;
|
|
use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings};
|
|
use milli::vector::EmbeddingConfigs;
|
|
use milli::{Criterion, Filter, Index, Object, TermsMatchingStrategy};
|
|
use serde_json::Value;
|
|
|
|
pub struct Conf<'a> {
|
|
/// where we are going to create our database.mmdb directory
|
|
/// each benchmark will first try to delete it and then recreate it
|
|
pub database_name: &'a str,
|
|
/// the dataset to be used, it must be an uncompressed csv
|
|
pub dataset: &'a str,
|
|
/// The format of the dataset
|
|
pub dataset_format: &'a str,
|
|
pub group_name: &'a str,
|
|
pub queries: &'a [&'a str],
|
|
/// here you can change which criterion are used and in which order.
|
|
/// - if you specify something all the base configuration will be thrown out
|
|
/// - if you don't specify anything (None) the default configuration will be kept
|
|
pub criterion: Option<&'a [&'a str]>,
|
|
/// the last chance to configure your database as you want
|
|
pub configure: fn(&mut Settings),
|
|
pub filter: Option<&'a str>,
|
|
pub sort: Option<Vec<&'a str>>,
|
|
/// enable or disable the optional words on the query
|
|
pub optional_words: bool,
|
|
/// primary key, if there is None we'll auto-generate docids for every documents
|
|
pub primary_key: Option<&'a str>,
|
|
}
|
|
|
|
impl Conf<'_> {
|
|
pub const BASE: Self = Conf {
|
|
database_name: "benches.mmdb",
|
|
dataset_format: "csv",
|
|
dataset: "",
|
|
group_name: "",
|
|
queries: &[],
|
|
criterion: None,
|
|
configure: |_| (),
|
|
filter: None,
|
|
sort: None,
|
|
optional_words: true,
|
|
primary_key: None,
|
|
};
|
|
}
|
|
|
|
pub fn base_setup(conf: &Conf) -> Index {
|
|
match remove_dir_all(conf.database_name) {
|
|
Ok(_) => (),
|
|
Err(e) if e.kind() == std::io::ErrorKind::NotFound => (),
|
|
Err(e) => panic!("{}", e),
|
|
}
|
|
create_dir_all(conf.database_name).unwrap();
|
|
|
|
let mut options = EnvOpenOptions::new();
|
|
options.map_size(100 * 1024 * 1024 * 1024); // 100 GB
|
|
options.max_readers(100);
|
|
let index = Index::new(options, conf.database_name).unwrap();
|
|
|
|
let config = IndexerConfig::default();
|
|
let mut wtxn = index.write_txn().unwrap();
|
|
let mut builder = Settings::new(&mut wtxn, &index, &config);
|
|
|
|
if let Some(primary_key) = conf.primary_key {
|
|
builder.set_primary_key(primary_key.to_string());
|
|
}
|
|
|
|
if let Some(criterion) = conf.criterion {
|
|
builder.reset_filterable_fields();
|
|
builder.reset_criteria();
|
|
builder.reset_stop_words();
|
|
|
|
let criterion = criterion.iter().map(|s| Criterion::from_str(s).unwrap()).collect();
|
|
builder.set_criteria(criterion);
|
|
}
|
|
|
|
(conf.configure)(&mut builder);
|
|
|
|
builder.execute(|_| (), || false).unwrap();
|
|
wtxn.commit().unwrap();
|
|
|
|
let config = IndexerConfig::default();
|
|
let mut wtxn = index.write_txn().unwrap();
|
|
let rtxn = index.read_txn().unwrap();
|
|
let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
|
let mut new_fields_ids_map = db_fields_ids_map.clone();
|
|
|
|
let documents = documents_from(conf.dataset, conf.dataset_format);
|
|
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
|
|
indexer.add_documents(&documents).unwrap();
|
|
|
|
let indexer_alloc = Bump::new();
|
|
let (document_changes, _operation_stats, primary_key) = indexer
|
|
.into_changes(
|
|
&indexer_alloc,
|
|
&index,
|
|
&rtxn,
|
|
None,
|
|
&mut new_fields_ids_map,
|
|
&|| false,
|
|
&|_progress| (),
|
|
)
|
|
.unwrap();
|
|
|
|
indexer::index(
|
|
&mut wtxn,
|
|
&index,
|
|
&milli::ThreadPoolNoAbortBuilder::new().build().unwrap(),
|
|
config.grenad_parameters(),
|
|
&db_fields_ids_map,
|
|
new_fields_ids_map,
|
|
primary_key,
|
|
&document_changes,
|
|
EmbeddingConfigs::default(),
|
|
&|| false,
|
|
&|_| (),
|
|
)
|
|
.unwrap();
|
|
|
|
wtxn.commit().unwrap();
|
|
drop(rtxn);
|
|
|
|
index
|
|
}
|
|
|
|
pub fn run_benches(c: &mut criterion::Criterion, confs: &[Conf]) {
|
|
for conf in confs {
|
|
let index = base_setup(conf);
|
|
|
|
let file_name = Path::new(conf.dataset).file_name().and_then(|f| f.to_str()).unwrap();
|
|
let name = format!("{}: {}", file_name, conf.group_name);
|
|
let mut group = c.benchmark_group(&name);
|
|
|
|
for &query in conf.queries {
|
|
group.bench_with_input(BenchmarkId::from_parameter(query), &query, |b, &query| {
|
|
b.iter(|| {
|
|
let rtxn = index.read_txn().unwrap();
|
|
let mut search = index.search(&rtxn);
|
|
search.query(query).terms_matching_strategy(TermsMatchingStrategy::default());
|
|
if let Some(filter) = conf.filter {
|
|
let filter = Filter::from_str(filter).unwrap().unwrap();
|
|
search.filter(filter);
|
|
}
|
|
if let Some(sort) = &conf.sort {
|
|
let sort = sort.iter().map(|sort| sort.parse().unwrap()).collect();
|
|
search.sort_criteria(sort);
|
|
}
|
|
let _ids = search.execute().unwrap();
|
|
});
|
|
});
|
|
}
|
|
group.finish();
|
|
|
|
index.prepare_for_closing().wait();
|
|
}
|
|
}
|
|
|
|
pub fn documents_from(filename: &str, filetype: &str) -> Mmap {
|
|
let file = File::open(filename)
|
|
.unwrap_or_else(|_| panic!("could not find the dataset in: {filename}"));
|
|
match filetype {
|
|
"csv" => documents_from_csv(file).unwrap(),
|
|
"json" => documents_from_json(file).unwrap(),
|
|
"jsonl" => documents_from_jsonl(file).unwrap(),
|
|
otherwise => panic!("invalid update format {otherwise:?}"),
|
|
}
|
|
}
|
|
|
|
fn documents_from_jsonl(file: File) -> anyhow::Result<Mmap> {
|
|
unsafe { Mmap::map(&file).map_err(Into::into) }
|
|
}
|
|
|
|
fn documents_from_json(file: File) -> anyhow::Result<Mmap> {
|
|
let reader = BufReader::new(file);
|
|
let documents: Vec<milli::Object> = serde_json::from_reader(reader)?;
|
|
let mut output = tempfile::tempfile().map(BufWriter::new)?;
|
|
|
|
for document in documents {
|
|
serde_json::to_writer(&mut output, &document)?;
|
|
}
|
|
|
|
let file = output.into_inner()?;
|
|
unsafe { Mmap::map(&file).map_err(Into::into) }
|
|
}
|
|
|
|
fn documents_from_csv(file: File) -> anyhow::Result<Mmap> {
|
|
let output = tempfile::tempfile()?;
|
|
let mut output = BufWriter::new(output);
|
|
let mut reader = csv::ReaderBuilder::new().from_reader(file);
|
|
|
|
let headers = reader.headers().context("while retrieving headers")?.clone();
|
|
let typed_fields: Vec<_> = headers.iter().map(parse_csv_header).collect();
|
|
let mut object: serde_json::Map<_, _> =
|
|
typed_fields.iter().map(|(k, _)| (k.to_string(), Value::Null)).collect();
|
|
|
|
let mut line = 0;
|
|
let mut record = csv::StringRecord::new();
|
|
while reader.read_record(&mut record).context("while reading a record")? {
|
|
// We increment here and not at the end of the loop
|
|
// to take the header offset into account.
|
|
line += 1;
|
|
|
|
// Reset the document values
|
|
object.iter_mut().for_each(|(_, v)| *v = Value::Null);
|
|
|
|
for (i, (name, atype)) in typed_fields.iter().enumerate() {
|
|
let value = &record[i];
|
|
let trimmed_value = value.trim();
|
|
let value = match atype {
|
|
AllowedType::Number if trimmed_value.is_empty() => Value::Null,
|
|
AllowedType::Number => {
|
|
match trimmed_value.parse::<i64>() {
|
|
Ok(integer) => Value::from(integer),
|
|
Err(_) => match trimmed_value.parse::<f64>() {
|
|
Ok(float) => Value::from(float),
|
|
Err(error) => {
|
|
anyhow::bail!("document format error on line {line}: {error}. For value: {value}")
|
|
}
|
|
},
|
|
}
|
|
}
|
|
AllowedType::Boolean if trimmed_value.is_empty() => Value::Null,
|
|
AllowedType::Boolean => match trimmed_value.parse::<bool>() {
|
|
Ok(bool) => Value::from(bool),
|
|
Err(error) => {
|
|
anyhow::bail!(
|
|
"document format error on line {line}: {error}. For value: {value}"
|
|
)
|
|
}
|
|
},
|
|
AllowedType::String if value.is_empty() => Value::Null,
|
|
AllowedType::String => Value::from(value),
|
|
};
|
|
|
|
*object.get_mut(name).expect("encountered an unknown field") = value;
|
|
}
|
|
|
|
serde_json::to_writer(&mut output, &object).context("while writing to disk")?;
|
|
}
|
|
|
|
let output = output.into_inner()?;
|
|
unsafe { Mmap::map(&output).map_err(Into::into) }
|
|
}
|
|
|
|
enum AllowedType {
|
|
String,
|
|
Boolean,
|
|
Number,
|
|
}
|
|
|
|
fn parse_csv_header(header: &str) -> (String, AllowedType) {
|
|
// if there are several separators we only split on the last one.
|
|
match header.rsplit_once(':') {
|
|
Some((field_name, field_type)) => match field_type {
|
|
"string" => (field_name.to_string(), AllowedType::String),
|
|
"boolean" => (field_name.to_string(), AllowedType::Boolean),
|
|
"number" => (field_name.to_string(), AllowedType::Number),
|
|
// if the pattern isn't recognized, we keep the whole field.
|
|
_otherwise => (header.to_string(), AllowedType::String),
|
|
},
|
|
None => (header.to_string(), AllowedType::String),
|
|
}
|
|
}
|
|
|
|
struct CSVDocumentDeserializer<R>
|
|
where
|
|
R: Read,
|
|
{
|
|
documents: csv::StringRecordsIntoIter<R>,
|
|
headers: Vec<(String, AllowedType)>,
|
|
}
|
|
|
|
impl<R: Read> CSVDocumentDeserializer<R> {
|
|
fn from_reader(reader: R) -> io::Result<Self> {
|
|
let mut records = csv::Reader::from_reader(reader);
|
|
|
|
let headers = records.headers()?.into_iter().map(parse_csv_header).collect();
|
|
|
|
Ok(Self { documents: records.into_records(), headers })
|
|
}
|
|
}
|
|
|
|
impl<R: Read> Iterator for CSVDocumentDeserializer<R> {
|
|
type Item = anyhow::Result<Object>;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
let csv_document = self.documents.next()?;
|
|
|
|
match csv_document {
|
|
Ok(csv_document) => {
|
|
let mut document = Object::new();
|
|
|
|
for ((field_name, field_type), value) in
|
|
self.headers.iter().zip(csv_document.into_iter())
|
|
{
|
|
let parsed_value: anyhow::Result<Value> = match field_type {
|
|
AllowedType::Number => {
|
|
value.parse::<f64>().map(Value::from).map_err(Into::into)
|
|
}
|
|
AllowedType::Boolean => {
|
|
value.parse::<bool>().map(Value::from).map_err(Into::into)
|
|
}
|
|
AllowedType::String => Ok(Value::String(value.to_string())),
|
|
};
|
|
|
|
match parsed_value {
|
|
Ok(value) => drop(document.insert(field_name.to_string(), value)),
|
|
Err(_e) => {
|
|
return Some(Err(anyhow::anyhow!(
|
|
"Value '{}' is not a valid number",
|
|
value
|
|
)))
|
|
}
|
|
}
|
|
}
|
|
|
|
Some(Ok(document))
|
|
}
|
|
Err(e) => Some(Err(anyhow::anyhow!("Error parsing csv document: {}", e))),
|
|
}
|
|
}
|
|
}
|