Fix the benchmark tests

This commit is contained in:
Clément Renault 2024-11-19 10:45:27 +01:00
parent aba8a0e9e0
commit 3cf1352ae1
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
6 changed files with 919 additions and 398 deletions

View file

@ -1,17 +1,19 @@
#![allow(dead_code)]
use std::fs::{create_dir_all, remove_dir_all, File};
use std::io::{self, BufRead, BufReader, Cursor, Read, Seek};
use std::io::{self, BufReader, BufWriter, Read};
use std::num::ParseFloatError;
use std::path::Path;
use std::str::FromStr;
use anyhow::Context;
use bumpalo::Bump;
use criterion::BenchmarkId;
use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use memmap2::Mmap;
use milli::heed::EnvOpenOptions;
use milli::update::{
IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings,
};
use milli::update::new::indexer;
use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings};
use milli::vector::EmbeddingConfigs;
use milli::{Criterion, Filter, Index, Object, TermsMatchingStrategy};
use serde_json::Value;
@ -92,18 +94,34 @@ pub fn base_setup(conf: &Conf) -> Index {
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let indexing_config = IndexDocumentsConfig {
autogenerate_docids: conf.primary_key.is_none(),
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap();
let rtxn = index.read_txn().unwrap();
let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let mut new_fields_ids_map = db_fields_ids_map.clone();
let documents = documents_from(conf.dataset, conf.dataset_format);
let (builder, user_error) = builder.add_documents(documents).unwrap();
user_error.unwrap();
builder.execute().unwrap();
let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments);
indexer.add_documents(&documents).unwrap();
let indexer_alloc = Bump::new();
let (document_changes, _operation_stats, primary_key) =
indexer.into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map).unwrap();
indexer::index(
&mut wtxn,
&index,
config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
primary_key,
&document_changes,
EmbeddingConfigs::default(),
&|| false,
&|_| (),
)
.unwrap();
wtxn.commit().unwrap();
drop(rtxn);
index
}
@ -141,48 +159,95 @@ pub fn run_benches(c: &mut criterion::Criterion, confs: &[Conf]) {
}
pub fn documents_from(filename: &str, filetype: &str) -> Mmap {
let reader = File::open(filename)
.unwrap_or_else(|_| panic!("could not find the dataset in: {}", filename));
let reader = BufReader::new(reader);
let documents = match filetype {
"csv" => documents_from_csv(reader).unwrap(),
"json" => documents_from_json(reader).unwrap(),
"jsonl" => documents_from_jsonl(reader).unwrap(),
otherwise => panic!("invalid update format {:?}", otherwise),
};
DocumentsBatchReader::from_reader(Cursor::new(documents)).unwrap()
let file = File::open(filename)
.unwrap_or_else(|_| panic!("could not find the dataset in: {filename}"));
match filetype {
"csv" => documents_from_csv(file).unwrap(),
"json" => documents_from_json(file).unwrap(),
"jsonl" => documents_from_jsonl(file).unwrap(),
otherwise => panic!("invalid update format {otherwise:?}"),
}
}
fn documents_from_jsonl(reader: impl BufRead) -> anyhow::Result<Vec<u8>> {
let mut documents = DocumentsBatchBuilder::new(Vec::new());
fn documents_from_jsonl(file: File) -> anyhow::Result<Mmap> {
unsafe { Mmap::map(&file).map_err(Into::into) }
}
for result in serde_json::Deserializer::from_reader(reader).into_iter::<Object>() {
let object = result?;
documents.append_json_object(&object)?;
fn documents_from_json(file: File) -> anyhow::Result<Mmap> {
let reader = BufReader::new(file);
let documents: Vec<milli::Object> = serde_json::from_reader(reader)?;
let mut output = tempfile::tempfile().map(BufWriter::new)?;
for document in documents {
serde_json::to_writer(&mut output, &document)?;
}
documents.into_inner().map_err(Into::into)
let file = output.into_inner()?;
unsafe { Mmap::map(&file).map_err(Into::into) }
}
fn documents_from_json(reader: impl BufRead) -> anyhow::Result<Vec<u8>> {
let mut documents = DocumentsBatchBuilder::new(Vec::new());
fn documents_from_csv(file: File) -> anyhow::Result<Mmap> {
let output = tempfile::tempfile()?;
let mut output = BufWriter::new(output);
let mut reader = csv::ReaderBuilder::new().from_reader(file);
documents.append_json_array(reader)?;
let headers = reader.headers().context("while retrieving headers")?.clone();
let typed_fields: Vec<_> = headers.iter().map(parse_csv_header).collect();
let mut object: serde_json::Map<_, _> =
typed_fields.iter().map(|(k, _)| (k.to_string(), Value::Null)).collect();
documents.into_inner().map_err(Into::into)
}
let mut line = 0;
let mut record = csv::StringRecord::new();
while reader.read_record(&mut record).context("while reading a record")? {
// We increment here and not at the end of the loop
// to take the header offset into account.
line += 1;
fn documents_from_csv(reader: impl BufRead) -> anyhow::Result<Vec<u8>> {
let csv = csv::Reader::from_reader(reader);
// Reset the document values
object.iter_mut().for_each(|(_, v)| *v = Value::Null);
let mut documents = DocumentsBatchBuilder::new(Vec::new());
documents.append_csv(csv)?;
for (i, (name, atype)) in typed_fields.iter().enumerate() {
let value = &record[i];
let trimmed_value = value.trim();
let value = match atype {
AllowedType::Number if trimmed_value.is_empty() => Value::Null,
AllowedType::Number => {
match trimmed_value.parse::<i64>() {
Ok(integer) => Value::from(integer),
Err(_) => match trimmed_value.parse::<f64>() {
Ok(float) => Value::from(float),
Err(error) => {
anyhow::bail!("document format error on line {line}: {error}. For value: {value}")
}
},
}
}
AllowedType::Boolean if trimmed_value.is_empty() => Value::Null,
AllowedType::Boolean => match trimmed_value.parse::<bool>() {
Ok(bool) => Value::from(bool),
Err(error) => {
anyhow::bail!(
"document format error on line {line}: {error}. For value: {value}"
)
}
},
AllowedType::String if value.is_empty() => Value::Null,
AllowedType::String => Value::from(value),
};
documents.into_inner().map_err(Into::into)
*object.get_mut(name).expect("encountered an unknown field") = value;
}
serde_json::to_writer(&mut output, &object).context("while writing to disk")?;
}
let output = output.into_inner()?;
unsafe { Mmap::map(&output).map_err(Into::into) }
}
enum AllowedType {
String,
Boolean,
Number,
}
@ -191,8 +256,9 @@ fn parse_csv_header(header: &str) -> (String, AllowedType) {
match header.rsplit_once(':') {
Some((field_name, field_type)) => match field_type {
"string" => (field_name.to_string(), AllowedType::String),
"boolean" => (field_name.to_string(), AllowedType::Boolean),
"number" => (field_name.to_string(), AllowedType::Number),
// we may return an error in this case.
// if the pattern isn't recognized, we keep the whole field.
_otherwise => (header.to_string(), AllowedType::String),
},
None => (header.to_string(), AllowedType::String),
@ -230,10 +296,13 @@ impl<R: Read> Iterator for CSVDocumentDeserializer<R> {
for ((field_name, field_type), value) in
self.headers.iter().zip(csv_document.into_iter())
{
let parsed_value: Result<Value, ParseFloatError> = match field_type {
let parsed_value: anyhow::Result<Value> = match field_type {
AllowedType::Number => {
value.parse::<f64>().map(Value::from).map_err(Into::into)
}
AllowedType::Boolean => {
value.parse::<bool>().map(Value::from).map_err(Into::into)
}
AllowedType::String => Ok(Value::String(value.to_string())),
};