Make the engine to return csv string records as documents and headers

This commit is contained in:
Kerollmops 2020-08-31 14:20:42 +02:00
parent bad0663138
commit 580ed1119a
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
8 changed files with 127 additions and 80 deletions

View file

@ -10,8 +10,10 @@ use anyhow::Context;
use arc_cache::ArcCache;
use bstr::ByteSlice as _;
use cow_utils::CowUtils;
use csv::StringRecord;
use flate2::read::GzDecoder;
use fst::IntoStreamer;
use heed::BytesEncode;
use heed::EnvOpenOptions;
use heed::types::*;
use log::{debug, info};
@ -21,8 +23,9 @@ use rayon::prelude::*;
use roaring::RoaringBitmap;
use structopt::StructOpt;
use milli::{SmallVec32, Index, DocumentId, Position, Attribute, BEU32};
use milli::heed_codec::CsvStringRecordCodec;
use milli::tokenizer::{simple_tokenizer, only_words};
use milli::{SmallVec32, Index, DocumentId, Position, Attribute, BEU32};
const LMDB_MAX_KEY_LENGTH: usize = 511;
const ONE_MILLION: usize = 1_000_000;
@ -205,13 +208,17 @@ impl Store {
Self::write_word_attribute_docids(&mut self.sorter, lrus)
}
pub fn write_headers(&mut self, headers: &[u8]) -> anyhow::Result<()> {
pub fn write_headers(&mut self, headers: &StringRecord) -> anyhow::Result<()> {
let headers = CsvStringRecordCodec::bytes_encode(headers)
.with_context(|| format!("could not encode csv record"))?;
Ok(self.sorter.insert(HEADERS_KEY, headers)?)
}
pub fn write_document(&mut self, id: DocumentId, content: &[u8]) -> anyhow::Result<()> {
pub fn write_document(&mut self, id: DocumentId, record: &StringRecord) -> anyhow::Result<()> {
let record = CsvStringRecordCodec::bytes_encode(record)
.with_context(|| format!("could not encode csv record"))?;
self.documents_ids.insert(id);
Ok(self.documents_sorter.insert(id.to_be_bytes(), content)?)
Ok(self.documents_sorter.insert(id.to_be_bytes(), record)?)
}
fn write_word_positions<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()>
@ -487,9 +494,6 @@ fn index_csv(
// Write the headers into a Vec of bytes and then into the store.
let headers = rdr.headers()?;
let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new());
writer.write_byte_record(headers.as_byte_record())?;
let headers = writer.into_inner()?;
store.write_headers(&headers)?;
let mut before = Instant::now();
@ -500,7 +504,7 @@ fn index_csv(
// We skip documents that must not be indexed by this thread.
if document_id % num_threads == thread_index {
if document_id % ONE_MILLION == 0 {
debug!("We have seen {}m documents so far ({:.02?}).",
info!("We have seen {}m documents so far ({:.02?}).",
document_id / ONE_MILLION, before.elapsed());
before = Instant::now();
}
@ -515,9 +519,6 @@ fn index_csv(
}
// We write the document in the database.
let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new());
writer.write_byte_record(document.as_byte_record())?;
let document = writer.into_inner()?;
store.write_document(document_id, &document)?;
}

View file

@ -1,4 +1,4 @@
use std::io::{self, Write, BufRead};
use std::io::{self, BufRead};
use std::iter::once;
use std::path::PathBuf;
use std::time::Instant;
@ -70,12 +70,12 @@ fn main() -> anyhow::Result<()> {
};
let documents = index.documents(&rtxn, result.documents_ids.iter().cloned())?;
let mut stdout = io::stdout();
stdout.write_all(&headers)?;
for (_id, content) in documents {
stdout.write_all(&content)?;
let mut wtr = csv::Writer::from_writer(io::stdout());
wtr.write_record(&headers)?;
for (_id, record) in documents {
wtr.write_record(&record)?;
}
wtr.flush()?;
debug!("Took {:.02?} to find {} documents", before.elapsed(), result.documents_ids.len());
}

View file

@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::collections::HashSet;
use std::fs::File;
use std::net::SocketAddr;
@ -45,20 +44,25 @@ struct Opt {
http_listen_addr: String,
}
fn highlight_string(string: &str, words: &HashSet<String>) -> String {
let mut output = String::new();
for (token_type, token) in simple_tokenizer(string) {
if token_type == TokenType::Word {
let lowercase_token = token.to_lowercase();
let to_highlight = words.contains(&lowercase_token);
if to_highlight { output.push_str("<mark>") }
output.push_str(token);
if to_highlight { output.push_str("</mark>") }
} else {
output.push_str(token);
fn highlight_record(record: &csv::StringRecord, words: &HashSet<String>) -> csv::StringRecord {
let mut output_record = csv::StringRecord::new();
let mut buffer = String::new();
for field in record {
buffer.clear();
for (token_type, token) in simple_tokenizer(field) {
if token_type == TokenType::Word {
let lowercase_token = token.to_lowercase();
let to_highlight = words.contains(&lowercase_token);
if to_highlight { buffer.push_str("<mark>") }
buffer.push_str(token);
if to_highlight { buffer.push_str("</mark>") }
} else {
buffer.push_str(token);
}
}
output_record.push_field(&buffer);
}
output
output_record
}
#[derive(Template)]
@ -186,23 +190,27 @@ async fn main() -> anyhow::Result<()> {
.execute()
.unwrap();
let mut body = Vec::new();
if let Some(headers) = index.headers(&rtxn).unwrap() {
// We write the headers
body.extend_from_slice(headers);
let documents = index.documents(&rtxn, documents_ids).unwrap();
let body = match index.headers(&rtxn).unwrap() {
Some(headers) => {
let mut wtr = csv::Writer::from_writer(Vec::new());
for (_id, content) in documents {
let content = std::str::from_utf8(content.as_ref()).unwrap();
let content = if disable_highlighting {
Cow::from(content)
} else {
Cow::from(highlight_string(content, &found_words))
};
// We write the headers
wtr.write_record(&headers).unwrap();
body.extend_from_slice(content.as_bytes());
}
}
let documents = index.documents(&rtxn, documents_ids).unwrap();
for (_id, record) in documents {
let record = if disable_highlighting {
record
} else {
highlight_record(&record, &found_words)
};
wtr.write_record(&record).unwrap();
}
wtr.into_inner().unwrap()
},
None => Vec::new(),
};
Response::builder()
.header("Content-Type", "text/csv")

View file

@ -0,0 +1,26 @@
use std::borrow::Cow;
use csv::{StringRecord, Writer, ReaderBuilder};
pub struct CsvStringRecordCodec;
impl heed::BytesDecode<'_> for CsvStringRecordCodec {
type DItem = StringRecord;
fn bytes_decode(bytes: &[u8]) -> Option<Self::DItem> {
let mut reader = ReaderBuilder::new()
.has_headers(false)
.buffer_capacity(bytes.len()) // we will just read this record
.from_reader(bytes);
reader.records().next()?.ok() // it return an Option of Result
}
}
impl heed::BytesEncode<'_> for CsvStringRecordCodec {
type EItem = StringRecord;
fn bytes_encode(item: &Self::EItem) -> Option<Cow<[u8]>> {
let mut writer = Writer::from_writer(Vec::new());
writer.write_record(item).ok()?;
writer.into_inner().ok().map(Cow::Owned)
}
}

View file

@ -1,5 +1,7 @@
mod csv_string_record_codec;
mod roaring_bitmap_codec;
mod str_beu32_codec;
pub use self::csv_string_record_codec::CsvStringRecordCodec;
pub use self::roaring_bitmap_codec::RoaringBitmapCodec;
pub use self::str_beu32_codec::StrBEU32Codec;

View file

@ -9,13 +9,14 @@ use std::collections::HashMap;
use std::hash::BuildHasherDefault;
use anyhow::Context;
use csv::StringRecord;
use fxhash::{FxHasher32, FxHasher64};
use heed::types::*;
use heed::{PolyDatabase, Database};
pub use self::search::{Search, SearchResult};
pub use self::criterion::{Criterion, default_criteria};
use self::heed_codec::{RoaringBitmapCodec, StrBEU32Codec};
use self::heed_codec::{RoaringBitmapCodec, StrBEU32Codec, CsvStringRecordCodec};
pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>;
pub type FastMap8<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher64>>;
@ -59,21 +60,17 @@ impl Index {
})
}
pub fn put_headers(&self, wtxn: &mut heed::RwTxn, headers: &[u8]) -> anyhow::Result<()> {
Ok(self.main.put::<_, Str, ByteSlice>(wtxn, HEADERS_KEY, headers)?)
pub fn put_headers(&self, wtxn: &mut heed::RwTxn, headers: &StringRecord) -> heed::Result<()> {
self.main.put::<_, Str, CsvStringRecordCodec>(wtxn, HEADERS_KEY, headers)
}
pub fn headers<'t>(&self, rtxn: &'t heed::RoTxn) -> heed::Result<Option<&'t [u8]>> {
self.main.get::<_, Str, ByteSlice>(rtxn, HEADERS_KEY)
pub fn headers(&self, rtxn: &heed::RoTxn) -> heed::Result<Option<StringRecord>> {
self.main.get::<_, Str, CsvStringRecordCodec>(rtxn, HEADERS_KEY)
}
pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<Option<usize>> {
pub fn number_of_attributes(&self, rtxn: &heed::RoTxn) -> anyhow::Result<Option<usize>> {
match self.headers(rtxn)? {
Some(headers) => {
let mut rdr = csv::Reader::from_reader(headers);
let headers = rdr.headers()?;
Ok(Some(headers.len()))
}
Some(headers) => Ok(Some(headers.len())),
None => Ok(None),
}
}
@ -94,13 +91,25 @@ impl Index {
&self,
rtxn: &'t heed::RoTxn,
iter: impl IntoIterator<Item=DocumentId>,
) -> anyhow::Result<Vec<(DocumentId, Vec<u8>)>>
) -> anyhow::Result<Vec<(DocumentId, StringRecord)>>
{
iter.into_iter().map(|id| {
let content = self.documents.get(rtxn, &BEU32::new(id))?
let ids: Vec<_> = iter.into_iter().collect();
let mut content = Vec::new();
for id in ids.iter().cloned() {
let document_content = self.documents.get(rtxn, &BEU32::new(id))?
.with_context(|| format!("Could not find document {}", id))?;
Ok((id, content.to_vec()))
}).collect()
content.extend_from_slice(document_content);
}
let mut rdr = csv::ReaderBuilder::new().has_headers(false).from_reader(&content[..]);
let mut documents = Vec::with_capacity(ids.len());
for (id, result) in ids.into_iter().zip(rdr.records()) {
documents.push((id, result?));
}
Ok(documents)
}
/// Returns the number of documents indexed in the database.