diff --git a/Cargo.lock b/Cargo.lock index 759e3ec77..a8d659d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -629,7 +629,7 @@ dependencies = [ "futures-util", "http", "indexmap", - "log 0.4.8", + "log 0.4.11", "slab", "tokio", "tokio-util", @@ -771,7 +771,7 @@ dependencies = [ "http-body", "httparse", "itoa", - "log 0.4.8", + "log 0.4.11", "pin-project", "socket2", "time", @@ -933,14 +933,14 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" dependencies = [ - "log 0.4.8", + "log 0.4.11", ] [[package]] name = "log" -version = "0.4.8" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b" dependencies = [ "cfg-if", ] @@ -1005,7 +1005,7 @@ dependencies = [ "itertools", "jemallocator", "levenshtein_automata", - "log 0.4.8", + "log 0.4.11", "memmap", "once_cell", "oxidized-mtbl", @@ -1081,7 +1081,7 @@ dependencies = [ "iovec", "kernel32-sys", "libc", - "log 0.4.8", + "log 0.4.11", "miow 0.2.1", "net2", "slab", @@ -1094,7 +1094,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5e374eff525ce1c5b7687c4cef63943e7686524a387933ad27ca7ec43779cb3" dependencies = [ - "log 0.4.8", + "log 0.4.11", "mio", "miow 0.3.4", "winapi 0.3.8", @@ -1141,7 +1141,7 @@ checksum = "136eed74cadb9edd2651ffba732b19a450316b680e4f48d6c79e905799e19d01" dependencies = [ "buf_redux", "httparse", - "log 0.4.8", + "log 0.4.11", "mime 0.2.6", "mime_guess 1.8.8", "quick-error", @@ -1228,11 +1228,12 @@ checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" [[package]] name = "oxidized-mtbl" version = "0.1.0" -source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=4ca66e5#4ca66e50115da760f602e878943af59f06c53af1" +source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=5426182#5426182d9ad8b74a9ebb386f03d33ce073cef0e0" dependencies = [ "byteorder", "crc32c", "flate2", + "log 0.4.11", "memmap", "snap", "tempfile", @@ -1836,7 +1837,7 @@ checksum = "32e5ee9b90a5452c570a0b0ac1c99ae9498db7e56e33d74366de7f2a7add7f25" dependencies = [ "atty", "chrono", - "log 0.4.8", + "log 0.4.11", "termcolor", "thread_local", ] @@ -2015,7 +2016,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8b8fe88007ebc363512449868d7da4389c9400072a3f666f212c7280082882a" dependencies = [ "futures", - "log 0.4.8", + "log 0.4.11", "pin-project", "tokio", "tungstenite", @@ -2030,7 +2031,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", - "log 0.4.8", + "log 0.4.11", "pin-project-lite", "tokio", ] @@ -2068,7 +2069,7 @@ dependencies = [ "http", "httparse", "input_buffer", - "log 0.4.8", + "log 0.4.11", "rand 0.7.3", "sha-1", "url", @@ -2211,7 +2212,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" dependencies = [ - "log 0.4.8", + "log 0.4.11", "try-lock", ] @@ -2226,7 +2227,7 @@ dependencies = [ "headers", "http", "hyper", - "log 0.4.8", + "log 0.4.11", "mime 0.3.16", "mime_guess 2.0.3", "multipart", @@ -2265,7 +2266,7 @@ checksum = "ded84f06e0ed21499f6184df0e0cb3494727b0c5da89534e0fcc55c51d812101" dependencies = [ "bumpalo", "lazy_static 1.4.0", - "log 0.4.8", + "log 0.4.11", "proc-macro2", "quote", "syn", diff --git a/Cargo.toml b/Cargo.toml index 50818ef87..8077e3b35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ jemallocator = "0.3.2" levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] } memmap = "0.7.0" once_cell = "1.4.0" -oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "4ca66e5" } +oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "5426182" } rayon = "1.3.1" ringtail = "0.3.0" roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "mem-usage" } @@ -33,7 +33,7 @@ structopt = { version = "0.3.14", default-features = false } tempfile = "3.1.0" # logging -log = "0.4.8" +log = "0.4.11" stderrlog = "0.4.3" # best proximity diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 9104fc22c..ee3880a26 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -10,8 +10,10 @@ use anyhow::Context; use arc_cache::ArcCache; use bstr::ByteSlice as _; use cow_utils::CowUtils; +use csv::StringRecord; use flate2::read::GzDecoder; use fst::IntoStreamer; +use heed::BytesEncode; use heed::EnvOpenOptions; use heed::types::*; use log::{debug, info}; @@ -21,8 +23,9 @@ use rayon::prelude::*; use roaring::RoaringBitmap; use structopt::StructOpt; -use milli::{SmallVec32, Index, DocumentId, Position, Attribute, BEU32}; +use milli::heed_codec::CsvStringRecordCodec; use milli::tokenizer::{simple_tokenizer, only_words}; +use milli::{SmallVec32, Index, DocumentId, Position, Attribute, BEU32}; const LMDB_MAX_KEY_LENGTH: usize = 511; const ONE_MILLION: usize = 1_000_000; @@ -205,13 +208,17 @@ impl Store { Self::write_word_attribute_docids(&mut self.sorter, lrus) } - pub fn write_headers(&mut self, headers: &[u8]) -> anyhow::Result<()> { + pub fn write_headers(&mut self, headers: &StringRecord) -> anyhow::Result<()> { + let headers = CsvStringRecordCodec::bytes_encode(headers) + .with_context(|| format!("could not encode csv record"))?; Ok(self.sorter.insert(HEADERS_KEY, headers)?) } - pub fn write_document(&mut self, id: DocumentId, content: &[u8]) -> anyhow::Result<()> { + pub fn write_document(&mut self, id: DocumentId, record: &StringRecord) -> anyhow::Result<()> { + let record = CsvStringRecordCodec::bytes_encode(record) + .with_context(|| format!("could not encode csv record"))?; self.documents_ids.insert(id); - Ok(self.documents_sorter.insert(id.to_be_bytes(), content)?) + Ok(self.documents_sorter.insert(id.to_be_bytes(), record)?) } fn write_word_positions(sorter: &mut Sorter, iter: I) -> anyhow::Result<()> @@ -487,9 +494,6 @@ fn index_csv( // Write the headers into a Vec of bytes and then into the store. let headers = rdr.headers()?; - let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); - writer.write_byte_record(headers.as_byte_record())?; - let headers = writer.into_inner()?; store.write_headers(&headers)?; let mut before = Instant::now(); @@ -500,7 +504,7 @@ fn index_csv( // We skip documents that must not be indexed by this thread. if document_id % num_threads == thread_index { if document_id % ONE_MILLION == 0 { - debug!("We have seen {}m documents so far ({:.02?}).", + info!("We have seen {}m documents so far ({:.02?}).", document_id / ONE_MILLION, before.elapsed()); before = Instant::now(); } @@ -515,9 +519,6 @@ fn index_csv( } // We write the document in the database. - let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); - writer.write_byte_record(document.as_byte_record())?; - let document = writer.into_inner()?; store.write_document(document_id, &document)?; } diff --git a/src/bin/search.rs b/src/bin/search.rs index 52d450f40..8c9e9abdb 100644 --- a/src/bin/search.rs +++ b/src/bin/search.rs @@ -1,4 +1,4 @@ -use std::io::{self, Write, BufRead}; +use std::io::{self, BufRead}; use std::iter::once; use std::path::PathBuf; use std::time::Instant; @@ -70,12 +70,12 @@ fn main() -> anyhow::Result<()> { }; let documents = index.documents(&rtxn, result.documents_ids.iter().cloned())?; - let mut stdout = io::stdout(); - stdout.write_all(&headers)?; - - for (_id, content) in documents { - stdout.write_all(&content)?; + let mut wtr = csv::Writer::from_writer(io::stdout()); + wtr.write_record(&headers)?; + for (_id, record) in documents { + wtr.write_record(&record)?; } + wtr.flush()?; debug!("Took {:.02?} to find {} documents", before.elapsed(), result.documents_ids.len()); } diff --git a/src/bin/serve.rs b/src/bin/serve.rs index 3a18023a0..bbb379872 100644 --- a/src/bin/serve.rs +++ b/src/bin/serve.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::HashSet; use std::fs::File; use std::net::SocketAddr; @@ -45,20 +44,25 @@ struct Opt { http_listen_addr: String, } -fn highlight_string(string: &str, words: &HashSet) -> String { - let mut output = String::new(); - for (token_type, token) in simple_tokenizer(string) { - if token_type == TokenType::Word { - let lowercase_token = token.to_lowercase(); - let to_highlight = words.contains(&lowercase_token); - if to_highlight { output.push_str("") } - output.push_str(token); - if to_highlight { output.push_str("") } - } else { - output.push_str(token); +fn highlight_record(record: &csv::StringRecord, words: &HashSet) -> csv::StringRecord { + let mut output_record = csv::StringRecord::new(); + let mut buffer = String::new(); + for field in record { + buffer.clear(); + for (token_type, token) in simple_tokenizer(field) { + if token_type == TokenType::Word { + let lowercase_token = token.to_lowercase(); + let to_highlight = words.contains(&lowercase_token); + if to_highlight { buffer.push_str("") } + buffer.push_str(token); + if to_highlight { buffer.push_str("") } + } else { + buffer.push_str(token); + } } + output_record.push_field(&buffer); } - output + output_record } #[derive(Template)] @@ -186,23 +190,27 @@ async fn main() -> anyhow::Result<()> { .execute() .unwrap(); - let mut body = Vec::new(); - if let Some(headers) = index.headers(&rtxn).unwrap() { - // We write the headers - body.extend_from_slice(headers); - let documents = index.documents(&rtxn, documents_ids).unwrap(); + let body = match index.headers(&rtxn).unwrap() { + Some(headers) => { + let mut wtr = csv::Writer::from_writer(Vec::new()); - for (_id, content) in documents { - let content = std::str::from_utf8(content.as_ref()).unwrap(); - let content = if disable_highlighting { - Cow::from(content) - } else { - Cow::from(highlight_string(content, &found_words)) - }; + // We write the headers + wtr.write_record(&headers).unwrap(); - body.extend_from_slice(content.as_bytes()); - } - } + let documents = index.documents(&rtxn, documents_ids).unwrap(); + for (_id, record) in documents { + let record = if disable_highlighting { + record + } else { + highlight_record(&record, &found_words) + }; + wtr.write_record(&record).unwrap(); + } + + wtr.into_inner().unwrap() + }, + None => Vec::new(), + }; Response::builder() .header("Content-Type", "text/csv") diff --git a/src/heed_codec/csv_string_record_codec.rs b/src/heed_codec/csv_string_record_codec.rs new file mode 100644 index 000000000..d444d48a4 --- /dev/null +++ b/src/heed_codec/csv_string_record_codec.rs @@ -0,0 +1,26 @@ +use std::borrow::Cow; +use csv::{StringRecord, Writer, ReaderBuilder}; + +pub struct CsvStringRecordCodec; + +impl heed::BytesDecode<'_> for CsvStringRecordCodec { + type DItem = StringRecord; + + fn bytes_decode(bytes: &[u8]) -> Option { + let mut reader = ReaderBuilder::new() + .has_headers(false) + .buffer_capacity(bytes.len()) // we will just read this record + .from_reader(bytes); + reader.records().next()?.ok() // it return an Option of Result + } +} + +impl heed::BytesEncode<'_> for CsvStringRecordCodec { + type EItem = StringRecord; + + fn bytes_encode(item: &Self::EItem) -> Option> { + let mut writer = Writer::from_writer(Vec::new()); + writer.write_record(item).ok()?; + writer.into_inner().ok().map(Cow::Owned) + } +} diff --git a/src/heed_codec/mod.rs b/src/heed_codec/mod.rs index bb75cdc15..1a0485994 100644 --- a/src/heed_codec/mod.rs +++ b/src/heed_codec/mod.rs @@ -1,5 +1,7 @@ +mod csv_string_record_codec; mod roaring_bitmap_codec; mod str_beu32_codec; +pub use self::csv_string_record_codec::CsvStringRecordCodec; pub use self::roaring_bitmap_codec::RoaringBitmapCodec; pub use self::str_beu32_codec::StrBEU32Codec; diff --git a/src/lib.rs b/src/lib.rs index 8d231e154..5ab8c5769 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,14 @@ use std::collections::HashMap; use std::hash::BuildHasherDefault; use anyhow::Context; +use csv::StringRecord; use fxhash::{FxHasher32, FxHasher64}; use heed::types::*; use heed::{PolyDatabase, Database}; pub use self::search::{Search, SearchResult}; pub use self::criterion::{Criterion, default_criteria}; -use self::heed_codec::{RoaringBitmapCodec, StrBEU32Codec}; +use self::heed_codec::{RoaringBitmapCodec, StrBEU32Codec, CsvStringRecordCodec}; pub type FastMap4 = HashMap>; pub type FastMap8 = HashMap>; @@ -59,21 +60,17 @@ impl Index { }) } - pub fn put_headers(&self, wtxn: &mut heed::RwTxn, headers: &[u8]) -> anyhow::Result<()> { - Ok(self.main.put::<_, Str, ByteSlice>(wtxn, HEADERS_KEY, headers)?) + pub fn put_headers(&self, wtxn: &mut heed::RwTxn, headers: &StringRecord) -> heed::Result<()> { + self.main.put::<_, Str, CsvStringRecordCodec>(wtxn, HEADERS_KEY, headers) } - pub fn headers<'t>(&self, rtxn: &'t heed::RoTxn) -> heed::Result> { - self.main.get::<_, Str, ByteSlice>(rtxn, HEADERS_KEY) + pub fn headers(&self, rtxn: &heed::RoTxn) -> heed::Result> { + self.main.get::<_, Str, CsvStringRecordCodec>(rtxn, HEADERS_KEY) } - pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result> { + pub fn number_of_attributes(&self, rtxn: &heed::RoTxn) -> anyhow::Result> { match self.headers(rtxn)? { - Some(headers) => { - let mut rdr = csv::Reader::from_reader(headers); - let headers = rdr.headers()?; - Ok(Some(headers.len())) - } + Some(headers) => Ok(Some(headers.len())), None => Ok(None), } } @@ -94,13 +91,25 @@ impl Index { &self, rtxn: &'t heed::RoTxn, iter: impl IntoIterator, - ) -> anyhow::Result)>> + ) -> anyhow::Result> { - iter.into_iter().map(|id| { - let content = self.documents.get(rtxn, &BEU32::new(id))? + let ids: Vec<_> = iter.into_iter().collect(); + let mut content = Vec::new(); + + for id in ids.iter().cloned() { + let document_content = self.documents.get(rtxn, &BEU32::new(id))? .with_context(|| format!("Could not find document {}", id))?; - Ok((id, content.to_vec())) - }).collect() + content.extend_from_slice(document_content); + } + + let mut rdr = csv::ReaderBuilder::new().has_headers(false).from_reader(&content[..]); + + let mut documents = Vec::with_capacity(ids.len()); + for (id, result) in ids.into_iter().zip(rdr.records()) { + documents.push((id, result?)); + } + + Ok(documents) } /// Returns the number of documents indexed in the database.