MeiliSearch/cli/src/main.rs

510 lines
15 KiB
Rust
Raw Normal View History

2022-01-20 18:31:47 +01:00
use std::collections::BTreeMap;
use std::fs::File;
2022-01-20 18:31:47 +01:00
use std::io::{stdin, BufRead, BufReader, Cursor, Read, Write};
use std::path::PathBuf;
use std::str::FromStr;
2022-01-20 18:31:47 +01:00
use std::time::Instant;
use byte_unit::Byte;
use eyre::Result;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use milli::update::UpdateIndexingStep::{
ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition,
};
use milli::update::{self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig};
2022-01-20 18:31:47 +01:00
use milli::Index;
use serde_json::{Map, Value};
use structopt::StructOpt;
#[cfg(target_os = "linux")]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[derive(Debug, StructOpt)]
#[structopt(name = "Milli CLI", about = "A simple CLI to manipulate a milli index.")]
struct Cli {
2022-03-07 10:30:47 +01:00
#[structopt(short, long, default_value = ".")]
index_path: PathBuf,
#[structopt(short = "s", long, default_value = "100GiB")]
index_size: Byte,
/// Verbose mode (-v, -vv, -vvv, etc.)
#[structopt(short, long, parse(from_occurrences))]
verbose: usize,
#[structopt(subcommand)]
subcommand: Command,
}
#[derive(Debug, StructOpt)]
enum Command {
2022-01-20 18:31:47 +01:00
Documents {
#[structopt(subcommand)]
cmd: Documents,
},
Search(Search),
2022-01-20 18:31:47 +01:00
Settings {
#[structopt(subcommand)]
cmd: Settings,
},
}
impl Performer for Command {
fn perform(self, index: Index) -> Result<()> {
match self {
Command::Documents { cmd } => cmd.perform(index),
Command::Search(cmd) => cmd.perform(index),
Command::Settings { cmd } => cmd.perform(index),
}
}
}
#[derive(Debug, StructOpt)]
enum Settings {
Update(SettingsUpdate),
Show,
}
impl Settings {
fn show(&self, index: Index) -> Result<()> {
let txn = index.read_txn()?;
let displayed_attributes = index
.displayed_fields(&txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let searchable_attributes: Option<Vec<_>> = index
.searchable_fields(&txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let filterable_attributes: Vec<_> = index.filterable_fields(&txn)?.into_iter().collect();
let sortable_attributes: Vec<_> = index.sortable_fields(&txn)?.into_iter().collect();
let criteria: Vec<_> = index.criteria(&txn)?.into_iter().map(|c| c.to_string()).collect();
let stop_words = index
.stop_words(&txn)?
.map(|stop_words| -> Result<Vec<_>> {
Ok(stop_words.stream().into_strs()?.into_iter().collect())
})
.transpose()?
.unwrap_or_else(Vec::new);
let distinct_field = index.distinct_field(&txn)?.map(String::from);
// in milli each word in the synonyms map were split on their separator. Since we lost
// this information we are going to put space between words.
let synonyms: BTreeMap<_, Vec<_>> = index
.synonyms(&txn)?
.iter()
.map(|(key, values)| {
(key.join(" "), values.iter().map(|value| value.join(" ")).collect())
})
.collect();
println!(
"displayed attributes:\n\t{}\nsearchable attributes:\n\t{}\nfilterable attributes:\n\t{}\nsortable attributes:\n\t{}\ncriterion:\n\t{}\nstop words:\n\t{}\ndistinct fields:\n\t{}\nsynonyms:\n\t{}\n",
displayed_attributes.unwrap_or(vec!["*".to_owned()]).join("\n\t"),
searchable_attributes.unwrap_or(vec!["*".to_owned()]).join("\n\t"),
filterable_attributes.join("\n\t"),
sortable_attributes.join("\n\t"),
criteria.join("\n\t"),
stop_words.join("\n\t"),
distinct_field.unwrap_or_default(),
synonyms.into_iter().map(|(k, v)| format!("\n\t{}:\n{:?}", k, v)).collect::<String>(),
);
Ok(())
}
}
impl Performer for Settings {
fn perform(self, index: Index) -> Result<()> {
match self {
Settings::Update(update) => update.perform(index),
Settings::Show => self.show(index),
}
}
}
#[derive(Debug, StructOpt)]
enum Documents {
Add(DocumentAddition),
}
impl Performer for Documents {
fn perform(self, index: Index) -> Result<()> {
match self {
Self::Add(addition) => addition.perform(index),
}
}
}
trait Performer {
fn perform(self, index: Index) -> Result<()>;
}
fn setup(opt: &Cli) -> Result<()> {
color_eyre::install()?;
stderrlog::new()
.verbosity(opt.verbose)
.show_level(false)
.timestamp(stderrlog::Timestamp::Off)
.init()?;
Ok(())
}
fn main() -> Result<()> {
let command = Cli::from_args();
setup(&command)?;
let mut options = heed::EnvOpenOptions::new();
options.map_size(command.index_size.get_bytes() as usize);
let index = milli::Index::new(options, command.index_path)?;
2022-01-20 18:31:47 +01:00
command.subcommand.perform(index)?;
Ok(())
}
#[derive(Debug)]
enum DocumentAdditionFormat {
Csv,
Json,
Jsonl,
}
impl FromStr for DocumentAdditionFormat {
type Err = eyre::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"csv" => Ok(Self::Csv),
"jsonl" => Ok(Self::Jsonl),
"json" => Ok(Self::Json),
other => eyre::bail!("invalid format: {}", other),
}
}
}
#[derive(Debug, StructOpt)]
struct DocumentAddition {
#[structopt(short, long, default_value = "json", possible_values = &["csv", "jsonl", "json"])]
format: DocumentAdditionFormat,
/// Path to the update file, if not present, will read from stdin.
#[structopt(short, long)]
path: Option<PathBuf>,
/// Specify the primary key.
#[structopt(long)]
primary: Option<String>,
/// Whether to generate missing document ids.
#[structopt(short, long)]
autogen_docids: bool,
/// Whether to update or replace the documents if they already exist.
#[structopt(short, long)]
update_documents: bool,
}
2022-01-20 18:31:47 +01:00
impl Performer for DocumentAddition {
fn perform(self, index: milli::Index) -> Result<()> {
let reader: Box<dyn Read> = match self.path {
Some(ref path) => {
let file = File::open(path)?;
Box::new(file)
}
None => Box::new(stdin()),
};
println!("parsing documents...");
2022-03-07 10:29:52 +01:00
let reader = BufReader::new(reader);
let documents = match self.format {
DocumentAdditionFormat::Csv => documents_from_csv(reader)?,
DocumentAdditionFormat::Json => documents_from_json(reader)?,
DocumentAdditionFormat::Jsonl => documents_from_jsonl(reader)?,
};
let reader = milli::documents::DocumentBatchReader::from_reader(Cursor::new(documents))?;
println!("Adding {} documents to the index.", reader.len());
let mut txn = index.env.write_txn()?;
let config = milli::update::IndexerConfig { log_every_n: Some(100), ..Default::default() };
let update_method = if self.update_documents {
IndexDocumentsMethod::UpdateDocuments
} else {
IndexDocumentsMethod::ReplaceDocuments
};
if let Some(primary) = self.primary {
let mut builder = update::Settings::new(&mut txn, &index, &config);
builder.set_primary_key(primary);
builder.execute(|_| ()).unwrap();
}
let indexing_config = IndexDocumentsConfig {
update_method,
autogenerate_docids: self.autogen_docids,
..Default::default()
};
let mut bars = Vec::new();
let progesses = MultiProgress::new();
for _ in 0..4 {
let bar = ProgressBar::hidden();
let bar = progesses.add(bar);
bars.push(bar);
}
let mut addition = milli::update::IndexDocuments::new(
&mut txn,
&index,
&config,
indexing_config,
|step| indexing_callback(step, &bars),
);
addition.add_documents(reader)?;
std::thread::spawn(move || {
progesses.join().unwrap();
});
let result = addition.execute()?;
txn.commit()?;
println!("{:?}", result);
Ok(())
}
}
fn indexing_callback(step: milli::update::UpdateIndexingStep, bars: &[ProgressBar]) {
let step_index = step.step();
let bar = &bars[step_index];
if step_index > 0 {
let prev = &bars[step_index - 1];
if !prev.is_finished() {
prev.disable_steady_tick();
prev.finish_at_current_pos();
}
}
let style = ProgressStyle::default_bar()
.template("[eta: {eta_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}")
.progress_chars("##-");
match step {
RemapDocumentAddition { documents_seen } => {
bar.set_style(ProgressStyle::default_spinner());
bar.set_message(format!("remaped {} documents so far.", documents_seen));
}
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
bar.set_style(style);
bar.set_length(total_documents as u64);
bar.set_message("Merging documents...");
bar.set_position(documents_seen as u64);
}
IndexDocuments { documents_seen, total_documents } => {
bar.set_style(style);
bar.set_length(total_documents as u64);
bar.set_message("Indexing documents...");
bar.set_position(documents_seen as u64);
}
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
bar.set_style(style);
bar.set_length(total_databases as u64);
bar.set_message("Merging databases...");
bar.set_position(databases_seen as u64);
}
}
bar.enable_steady_tick(200);
}
fn documents_from_jsonl(reader: impl Read) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
2021-10-25 17:38:32 +02:00
let mut buf = String::new();
let mut reader = BufReader::new(reader);
while reader.read_line(&mut buf)? > 0 {
documents.extend_from_json(&mut buf.as_bytes())?;
}
documents.finish()?;
Ok(writer.into_inner())
}
fn documents_from_json(reader: impl Read) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
2021-10-25 17:38:32 +02:00
documents.extend_from_json(reader)?;
documents.finish()?;
Ok(writer.into_inner())
}
fn documents_from_csv(reader: impl Read) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
2021-10-25 17:38:32 +02:00
milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;
Ok(writer.into_inner())
}
#[derive(Debug, StructOpt)]
struct Search {
query: Option<String>,
#[structopt(short, long)]
filter: Option<String>,
#[structopt(short, long)]
offset: Option<usize>,
#[structopt(short, long)]
limit: Option<usize>,
2022-01-20 18:31:47 +01:00
#[structopt(short, long, conflicts_with = "query")]
interactive: bool,
}
impl Performer for Search {
fn perform(self, index: milli::Index) -> Result<()> {
if self.interactive {
let stdin = std::io::stdin();
let mut lines = stdin.lock().lines();
loop {
eprint!("> ");
std::io::stdout().flush()?;
match lines.next() {
Some(Ok(line)) => {
let now = Instant::now();
let jsons = Self::perform_single_search(
&index,
&Some(line),
&self.filter,
&self.offset,
&self.limit,
)?;
let time = now.elapsed();
let hits = serde_json::to_string_pretty(&jsons)?;
println!("{}", hits);
eprintln!("found {} results in {:.02?}", jsons.len(), time);
}
_ => break,
}
}
} else {
let now = Instant::now();
let jsons = Self::perform_single_search(
&index,
&self.query,
&self.filter,
&self.offset,
&self.limit,
)?;
let time = now.elapsed();
let hits = serde_json::to_string_pretty(&jsons)?;
println!("{}", hits);
eprintln!("found {} results in {:.02?}", jsons.len(), time);
}
Ok(())
}
}
impl Search {
2022-01-20 18:31:47 +01:00
fn perform_single_search(
index: &milli::Index,
query: &Option<String>,
filter: &Option<String>,
offset: &Option<usize>,
limit: &Option<usize>,
) -> Result<Vec<Map<String, Value>>> {
let txn = index.env.read_txn()?;
let mut search = index.search(&txn);
2022-01-20 18:31:47 +01:00
if let Some(ref query) = query {
search.query(query);
}
2022-01-20 18:31:47 +01:00
if let Some(ref filter) = filter {
if let Some(condition) = milli::Filter::from_str(filter)? {
search.filter(condition);
}
}
2022-01-20 18:31:47 +01:00
if let Some(offset) = offset {
search.offset(*offset);
}
2022-01-20 18:31:47 +01:00
if let Some(limit) = limit {
search.limit(*limit);
}
let result = search.execute()?;
let fields_ids_map = index.fields_ids_map(&txn)?;
let displayed_fields =
index.displayed_fields_ids(&txn)?.unwrap_or_else(|| fields_ids_map.ids().collect());
let documents = index.documents(&txn, result.documents_ids)?;
let mut jsons = Vec::new();
for (_, obkv) in documents {
let json = milli::obkv_to_json(&displayed_fields, &fields_ids_map, obkv)?;
jsons.push(json);
}
2022-01-20 18:31:47 +01:00
Ok(jsons)
}
}
#[derive(Debug, StructOpt)]
struct SettingsUpdate {
2022-02-01 20:10:16 +01:00
#[structopt(long)]
filterable_attributes: Option<Vec<String>>,
2022-02-01 20:10:16 +01:00
#[structopt(long)]
criteria: Option<Vec<String>>,
}
2022-01-20 18:31:47 +01:00
impl Performer for SettingsUpdate {
fn perform(self, index: milli::Index) -> Result<()> {
let mut txn = index.env.write_txn()?;
let config = IndexerConfig { log_every_n: Some(100), ..Default::default() };
let mut update = milli::update::Settings::new(&mut txn, &index, &config);
if let Some(ref filterable_attributes) = self.filterable_attributes {
if !filterable_attributes.is_empty() {
update.set_filterable_fields(filterable_attributes.iter().cloned().collect());
} else {
update.reset_filterable_fields();
}
}
2022-02-01 20:10:16 +01:00
if let Some(criteria) = self.criteria {
if !criteria.is_empty() {
update.set_criteria(criteria);
} else {
update.reset_criteria();
}
}
let mut bars = Vec::new();
let progesses = MultiProgress::new();
for _ in 0..4 {
let bar = ProgressBar::hidden();
let bar = progesses.add(bar);
bars.push(bar);
}
std::thread::spawn(move || {
progesses.join().unwrap();
});
2021-11-03 13:12:01 +01:00
update.execute(|step| indexing_callback(step, &bars))?;
txn.commit()?;
Ok(())
}
}