From a7a4984175299f51cde247f14b88f9400e745128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sat, 24 Oct 2020 16:23:08 +0200 Subject: [PATCH] Introduce the Transform type into the indexing system --- src/index.rs | 12 ++-- src/indexing/mod.rs | 122 +++++++++++++++++++++----------------- src/indexing/store.rs | 82 +++++++------------------ src/indexing/transform.rs | 33 ++++++----- src/main.rs | 3 - src/subcommand/indexer.rs | 67 --------------------- src/subcommand/mod.rs | 1 - src/subcommand/serve.rs | 104 ++++++++++++++++++-------------- 8 files changed, 173 insertions(+), 251 deletions(-) delete mode 100644 src/subcommand/indexer.rs diff --git a/src/index.rs b/src/index.rs index 861cf99bd..f00f35ec8 100644 --- a/src/index.rs +++ b/src/index.rs @@ -14,7 +14,7 @@ use crate::{ pub const WORDS_FST_KEY: &str = "words-fst"; pub const FIELDS_IDS_MAP_KEY: &str = "fields-ids-map"; pub const DOCUMENTS_IDS_KEY: &str = "documents-ids"; -pub const USER_IDS_DOCUMENTS_IDS_KEY: &str = "user-ids-documents-ids"; +pub const USERS_IDS_DOCUMENTS_IDS_KEY: &str = "users-ids-documents-ids"; #[derive(Clone)] pub struct Index { @@ -51,16 +51,16 @@ impl Index { self.main.get::<_, Str, RoaringBitmapCodec>(rtxn, DOCUMENTS_IDS_KEY) } - /// Writes the user ids documents ids, a user id is a byte slice (i.e. `[u8]`) + /// Writes the users ids documents ids, a user id is a byte slice (i.e. `[u8]`) /// and refers to an internal id (i.e. `u32`). - pub fn put_user_ids_documents_ids>(&self, wtxn: &mut heed::RwTxn, fst: &fst::Set) -> heed::Result<()> { - self.main.put::<_, Str, ByteSlice>(wtxn, USER_IDS_DOCUMENTS_IDS_KEY, fst.as_fst().as_bytes()) + pub fn put_users_ids_documents_ids>(&self, wtxn: &mut heed::RwTxn, fst: &fst::Map) -> heed::Result<()> { + self.main.put::<_, Str, ByteSlice>(wtxn, USERS_IDS_DOCUMENTS_IDS_KEY, fst.as_fst().as_bytes()) } /// Returns the user ids documents ids map which associate the user ids (i.e. `[u8]`) /// with the internal ids (i.e. `u32`). - pub fn user_ids_documents_ids<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result>> { - match self.main.get::<_, Str, ByteSlice>(rtxn, USER_IDS_DOCUMENTS_IDS_KEY)? { + pub fn users_ids_documents_ids<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result>> { + match self.main.get::<_, Str, ByteSlice>(rtxn, USERS_IDS_DOCUMENTS_IDS_KEY)? { Some(bytes) => Ok(Some(fst::Map::new(bytes)?)), None => Ok(None), } diff --git a/src/indexing/mod.rs b/src/indexing/mod.rs index e98ff1ce5..471fb891c 100644 --- a/src/indexing/mod.rs +++ b/src/indexing/mod.rs @@ -1,26 +1,29 @@ use std::borrow::Cow; use std::fs::File; -use std::io::{self, Read, Seek, SeekFrom}; +use std::io::{self, Seek, SeekFrom}; use std::sync::mpsc::sync_channel; use std::time::Instant; use anyhow::Context; use bstr::ByteSlice as _; -use flate2::read::GzDecoder; use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use heed::types::ByteSlice; use log::{debug, info, error}; use rayon::prelude::*; +use roaring::RoaringBitmap; use structopt::StructOpt; use tempfile::tempfile; -use crate::Index; +use crate::FieldsIdsMap; +use crate::index::Index; use self::store::Store; use self::merge_function::{ main_merge, word_docids_merge, words_pairs_proximities_docids_merge, docid_word_positions_merge, documents_merge, }; +pub use self::transform::{Transform, TransformOutput}; + mod merge_function; mod store; mod transform; @@ -30,11 +33,11 @@ pub struct IndexerOpt { /// The amount of documents to skip before printing /// a log regarding the indexing advancement. #[structopt(long, default_value = "1000000")] // 1m - log_every_n: usize, + pub log_every_n: usize, /// MTBL max number of chunks in bytes. #[structopt(long)] - max_nb_chunks: Option, + pub max_nb_chunks: Option, /// The maximum amount of memory to use for the MTBL buffer. It is recommended /// to use something like 80%-90% of the available memory. @@ -42,23 +45,23 @@ pub struct IndexerOpt { /// It is automatically split by the number of jobs e.g. if you use 7 jobs /// and 7 GB of max memory, each thread will use a maximum of 1 GB. #[structopt(long, default_value = "7516192768")] // 7 GB - max_memory: usize, + pub max_memory: usize, /// Size of the linked hash map cache when indexing. /// The bigger it is, the faster the indexing is but the more memory it takes. #[structopt(long, default_value = "500")] - linked_hash_map_size: usize, + pub linked_hash_map_size: usize, /// The name of the compression algorithm to use when compressing intermediate /// chunks during indexing documents. /// /// Choosing a fast algorithm will make the indexing faster but may consume more memory. #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] - chunk_compression_type: CompressionType, + pub chunk_compression_type: CompressionType, /// The level of compression of the chosen algorithm. #[structopt(long, requires = "chunk-compression-type")] - chunk_compression_level: Option, + pub chunk_compression_level: Option, /// The number of bytes to remove from the begining of the chunks while reading/sorting /// or merging them. @@ -66,15 +69,15 @@ pub struct IndexerOpt { /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. #[structopt(long, default_value = "4294967296")] // 4 GB - chunk_fusing_shrink_size: u64, + pub chunk_fusing_shrink_size: u64, /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. #[structopt(long)] - enable_chunk_fusing: bool, + pub enable_chunk_fusing: bool, /// Number of parallel jobs for indexing, defaults to # of CPUs. #[structopt(long)] - indexing_jobs: Option, + pub indexing_jobs: Option, } #[derive(Debug, Copy, Clone)] @@ -210,51 +213,48 @@ fn write_into_lmdb_database( Ok(()) } -fn csv_bytes_readers<'a>( - content: &'a [u8], - gzipped: bool, - count: usize, -) -> Vec>> -{ - let mut readers = Vec::new(); - - for _ in 0..count { - let content = if gzipped { - Box::new(GzDecoder::new(content)) as Box - } else { - Box::new(content) as Box - }; - let reader = csv::Reader::from_reader(content); - readers.push(reader); - } - - readers -} - -pub fn run<'a, F>( +pub fn run( env: &heed::Env, index: &Index, opt: &IndexerOpt, - content: &'a [u8], - gzipped: bool, + fields_ids_map: FieldsIdsMap, + users_ids_documents_ids: fst::Map>, + new_documents_ids: RoaringBitmap, + documents: grenad::Reader<&[u8]>, + documents_count: u32, progress_callback: F, ) -> anyhow::Result<()> -where F: Fn(u32) + Sync + Send, +where F: Fn(u32, u32) + Sync + Send, { let jobs = opt.indexing_jobs.unwrap_or(0); let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; - pool.install(|| run_intern(env, index, opt, content, gzipped, progress_callback)) + pool.install(|| { + run_intern( + env, + index, + opt, + fields_ids_map, + users_ids_documents_ids, + new_documents_ids, + documents, + documents_count, + progress_callback, + ) + }) } -fn run_intern<'a, F>( +fn run_intern( env: &heed::Env, index: &Index, opt: &IndexerOpt, - content: &'a [u8], - gzipped: bool, + fields_ids_map: FieldsIdsMap, + users_ids_documents_ids: fst::Map>, + new_documents_ids: RoaringBitmap, + documents: grenad::Reader<&[u8]>, + documents_count: u32, progress_callback: F, ) -> anyhow::Result<()> -where F: Fn(u32) + Sync + Send, +where F: Fn(u32, u32) + Sync + Send, { let before_indexing = Instant::now(); let num_threads = rayon::current_num_threads(); @@ -271,14 +271,9 @@ where F: Fn(u32) + Sync + Send, None }; - let rtxn = env.read_txn()?; - let number_of_documents = index.number_of_documents(&rtxn)?; - drop(rtxn); - - let readers = csv_bytes_readers(content, gzipped, num_threads) - .into_par_iter() + let readers = rayon::iter::repeatn(documents, num_threads) .enumerate() - .map(|(i, rdr)| { + .map(|(i, documents)| { let store = Store::new( linked_hash_map_size, max_nb_chunks, @@ -287,8 +282,7 @@ where F: Fn(u32) + Sync + Send, chunk_compression_level, chunk_fusing_shrink_size, )?; - let base_document_id = number_of_documents; - store.index_csv(rdr, base_document_id, i, num_threads, log_every_n, &progress_callback) + store.index(documents, documents_count, i, num_threads, log_every_n, &progress_callback) }) .collect::, _>>()?; @@ -341,10 +335,32 @@ where F: Fn(u32) + Sync + Send, }); }); + // We create the write transaction of this update. + // TODO we must get this transaction as an argument to be able + // to first delete the replaced documents for example. let mut wtxn = env.write_txn()?; - let contains_documents = number_of_documents != 0; - let write_method = if contains_documents { WriteMethod::GetMergePut } else { WriteMethod::Append }; + let contains_documents = index.documents_ids(&wtxn)?.map_or(false, |docids| !docids.is_empty()); + let write_method = if contains_documents { + WriteMethod::GetMergePut + } else { + WriteMethod::Append + }; + + // We write the fields ids map into the main database + index.put_fields_ids_map(&mut wtxn, &fields_ids_map)?; + + // We write the users_ids_documents_ids into the main database. + index.put_users_ids_documents_ids(&mut wtxn, &users_ids_documents_ids)?; + + // We merge the new documents ids with the existing ones. + match index.documents_ids(&wtxn)? { + Some(mut documents_ids) => { + documents_ids.union_with(&new_documents_ids); + index.put_documents_ids(&mut wtxn, &documents_ids)?; + }, + None => index.put_documents_ids(&mut wtxn, &new_documents_ids)?, + } debug!("Writing the docid word positions into LMDB on disk..."); merge_into_lmdb_database( diff --git a/src/indexing/store.rs b/src/indexing/store.rs index 28564a1e0..a6d2afa95 100644 --- a/src/indexing/store.rs +++ b/src/indexing/store.rs @@ -1,14 +1,13 @@ +use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; use std::convert::{TryFrom, TryInto}; use std::fs::File; -use std::io::Read; use std::iter::FromIterator; use std::time::Instant; use std::{cmp, iter}; use anyhow::Context; use bstr::ByteSlice as _; -use csv::StringRecord; use heed::BytesEncode; use linked_hash_map::LinkedHashMap; use log::{debug, info}; @@ -16,7 +15,6 @@ use grenad::{Reader, FileFuse, Writer, Sorter, CompressionType}; use roaring::RoaringBitmap; use tempfile::tempfile; -use crate::fields_ids_map::FieldsIdsMap; use crate::heed_codec::{BoRoaringBitmapCodec, CboRoaringBitmapCodec}; use crate::tokenizer::{simple_tokenizer, only_token}; use crate::{SmallVec32, Position, DocumentId}; @@ -28,11 +26,7 @@ const LMDB_MAX_KEY_LENGTH: usize = 511; const ONE_KILOBYTE: usize = 1024 * 1024; const MAX_POSITION: usize = 1000; -const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; - const WORDS_FST_KEY: &[u8] = crate::index::WORDS_FST_KEY.as_bytes(); -const FIELDS_IDS_MAP_KEY: &[u8] = crate::index::FIELDS_IDS_MAP_KEY.as_bytes(); -const DOCUMENTS_IDS_KEY: &[u8] = crate::index::DOCUMENTS_IDS_KEY.as_bytes(); pub struct Readers { pub main: Reader, @@ -47,7 +41,6 @@ pub struct Store { word_docids_limit: usize, words_pairs_proximities_docids: LinkedHashMap<(SmallVec32, SmallVec32, u8), RoaringBitmap>, words_pairs_proximities_docids_limit: usize, - documents_ids: RoaringBitmap, // MTBL parameters chunk_compression_type: CompressionType, chunk_compression_level: Option, @@ -111,7 +104,6 @@ impl Store { word_docids_limit: linked_hash_map_size, words_pairs_proximities_docids: LinkedHashMap::with_capacity(linked_hash_map_size), words_pairs_proximities_docids_limit: linked_hash_map_size, - documents_ids: RoaringBitmap::new(), chunk_compression_type, chunk_compression_level, chunk_fusing_shrink_size, @@ -183,17 +175,11 @@ impl Store { Ok(()) } - fn write_fields_ids_map(&mut self, map: &FieldsIdsMap) -> anyhow::Result<()> { - let bytes = serde_json::to_vec(&map)?; - self.main_sorter.insert(FIELDS_IDS_MAP_KEY, bytes)?; - Ok(()) - } - fn write_document( &mut self, document_id: DocumentId, words_positions: &HashMap>, - record: &StringRecord, + record: &[u8], ) -> anyhow::Result<()> { // We compute the list of words pairs proximities (self-join) and write it directly to disk. @@ -205,15 +191,7 @@ impl Store { self.insert_word_docid(word, document_id)?; } - let mut writer = obkv::KvWriter::memory(); - record.iter().enumerate().for_each(|(i, v)| { - let key = i.try_into().unwrap(); - writer.insert(key, v.as_bytes()).unwrap(); - }); - let bytes = writer.into_inner().unwrap(); - - self.documents_ids.insert(document_id); - self.documents_writer.insert(document_id.to_be_bytes(), bytes)?; + self.documents_writer.insert(document_id.to_be_bytes(), record)?; Self::write_docid_word_positions(&mut self.docid_word_positions_writer, document_id, words_positions)?; Ok(()) @@ -299,70 +277,55 @@ impl Store { Ok(()) } - fn write_documents_ids(sorter: &mut Sorter, ids: RoaringBitmap) -> anyhow::Result<()> { - let mut buffer = Vec::with_capacity(ids.serialized_size()); - ids.serialize_into(&mut buffer)?; - sorter.insert(DOCUMENTS_IDS_KEY, &buffer)?; - Ok(()) - } - - pub fn index_csv<'a, F>( + pub fn index( mut self, - mut rdr: csv::Reader>, - base_document_id: usize, + mut documents: grenad::Reader<&[u8]>, + documents_count: u32, thread_index: usize, num_threads: usize, log_every_n: usize, mut progress_callback: F, ) -> anyhow::Result - where F: FnMut(u32), + where F: FnMut(u32, u32), { debug!("{:?}: Indexing in a Store...", thread_index); - // Write the headers into the store. - let headers = rdr.headers()?; - - let mut fields_ids_map = FieldsIdsMap::new(); - for header in headers.iter() { - fields_ids_map.insert(header).context("no more field id available")?; - } - self.write_fields_ids_map(&fields_ids_map)?; - let mut before = Instant::now(); - let mut document_id: usize = base_document_id; - let mut document = csv::StringRecord::new(); let mut words_positions = HashMap::new(); - while rdr.read_record(&mut document)? { + let mut count: usize = 0; + while let Some((key, value)) = documents.next()? { + let document_id = key.try_into().map(u32::from_be_bytes).unwrap(); + let document = obkv::KvReader::new(value); + // We skip documents that must not be indexed by this thread. - if document_id % num_threads == thread_index { + if count % num_threads == thread_index { // This is a log routine that we do every `log_every_n` documents. - if document_id % log_every_n == 0 { - let count = format_count(document_id); - info!("We have seen {} documents so far ({:.02?}).", count, before.elapsed()); - progress_callback((document_id - base_document_id) as u32); + if count % log_every_n == 0 { + info!("We have seen {} documents so far ({:.02?}).", format_count(count), before.elapsed()); + progress_callback(count as u32, documents_count); before = Instant::now(); } - let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; - for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { + for (attr, content) in document.iter() { + let content: Cow = serde_json::from_slice(content).unwrap(); for (pos, token) in simple_tokenizer(&content).filter_map(only_token).enumerate().take(MAX_POSITION) { let word = token.to_lowercase(); - let position = (attr * MAX_POSITION + pos) as u32; + let position = (attr as usize * MAX_POSITION + pos) as u32; words_positions.entry(word).or_insert_with(SmallVec32::new).push(position); } } // We write the document in the documents store. - self.write_document(document_id, &words_positions, &document)?; + self.write_document(document_id, &words_positions, value)?; words_positions.clear(); } // Compute the document id of the next document. - document_id = document_id + 1; + count = count + 1; } - progress_callback((document_id - base_document_id) as u32); + progress_callback(count as u32, documents_count); let readers = self.finish()?; debug!("{:?}: Store created!", thread_index); @@ -375,7 +338,6 @@ impl Store { let shrink_size = self.chunk_fusing_shrink_size; Self::write_word_docids(&mut self.word_docids_sorter, self.word_docids)?; - Self::write_documents_ids(&mut self.main_sorter, self.documents_ids)?; Self::write_words_pairs_proximities( &mut self.words_pairs_proximities_docids_sorter, self.words_pairs_proximities_docids, diff --git a/src/indexing/transform.rs b/src/indexing/transform.rs index f5187c2df..9e5b1ae6c 100644 --- a/src/indexing/transform.rs +++ b/src/indexing/transform.rs @@ -6,9 +6,11 @@ use std::io::{Read, Seek, SeekFrom}; use anyhow::Context; use crate::{FieldsIdsMap, AvailableDocumentsIds}; use fst::{IntoStreamer, Streamer}; -use grenad::{Writer, Sorter, CompressionType}; +use grenad::CompressionType; use roaring::RoaringBitmap; +use super::{create_writer, create_sorter}; + pub struct TransformOutput { pub fields_ids_map: FieldsIdsMap, pub users_ids_documents_ids: fst::Map>, @@ -22,9 +24,11 @@ pub struct Transform { pub fields_ids_map: FieldsIdsMap, pub available_documents_ids: AvailableDocumentsIds, pub users_ids_documents_ids: fst::Map, - pub compression_type: CompressionType, - pub compression_level: u32, - pub enable_file_fuzing: bool, + pub chunk_compression_type: CompressionType, + pub chunk_compression_level: Option, + pub chunk_fusing_shrink_size: Option, + pub max_nb_chunks: Option, + pub max_memory: Option, } impl> Transform { @@ -53,16 +57,17 @@ impl> Transform { } // We initialize the sorter with the user indexing settings. - let mut sorter_builder = Sorter::builder(merge_last_win); - sorter_builder.chunk_compression_type(self.compression_type); - sorter_builder.chunk_compression_level(self.compression_level); - if self.enable_file_fuzing { - sorter_builder.enable_fusing(); - } + let mut sorter = create_sorter( + merge_last_win, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.max_nb_chunks, + self.max_memory, + ); // We write into the sorter to merge and deduplicate the documents // based on the users ids. - let mut sorter = sorter_builder.build(); let mut json_buffer = Vec::new(); let mut obkv_buffer = Vec::new(); let mut record = csv::StringRecord::new(); @@ -88,11 +93,7 @@ impl> Transform { // Once we have sort and deduplicated the documents we write them into a final file. let file = tempfile::tempfile()?; - let mut writer_builder = Writer::builder(); - writer_builder.compression_type(self.compression_type); - writer_builder.compression_level(self.compression_level); - - let mut writer = writer_builder.build(file)?; + let mut writer = create_writer(self.chunk_compression_type, self.chunk_compression_level, file)?; let mut new_users_ids_documents_ids_builder = fst::MapBuilder::memory(); let mut replaced_documents_ids = RoaringBitmap::new(); let mut new_documents_ids = RoaringBitmap::new(); diff --git a/src/main.rs b/src/main.rs index 436109469..75169214e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ use structopt::StructOpt; -use milli::subcommand::indexer::{self, Opt as IndexerOpt}; use milli::subcommand::infos::{self, Opt as InfosOpt}; use milli::subcommand::serve::{self, Opt as ServeOpt}; use milli::subcommand::search::{self, Opt as SearchOpt}; @@ -13,7 +12,6 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; #[structopt(name = "milli", about = "The milli project.")] enum Command { Serve(ServeOpt), - Indexer(IndexerOpt), Infos(InfosOpt), Search(SearchOpt), } @@ -21,7 +19,6 @@ enum Command { fn main() -> anyhow::Result<()> { match Command::from_args() { Command::Serve(opt) => serve::run(opt), - Command::Indexer(opt) => indexer::run(opt), Command::Infos(opt) => infos::run(opt), Command::Search(opt) => search::run(opt), } diff --git a/src/subcommand/indexer.rs b/src/subcommand/indexer.rs deleted file mode 100644 index ba0a03c41..000000000 --- a/src/subcommand/indexer.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::fs::File; -use std::path::PathBuf; - -use anyhow::bail; -use heed::EnvOpenOptions; -use structopt::StructOpt; - -use crate::indexing::{self, IndexerOpt}; -use crate::Index; - -#[derive(Debug, StructOpt)] -#[structopt(name = "milli-indexer")] -/// The indexer binary of the milli project. -pub struct Opt { - /// The database path where the database is located. - /// It is created if it doesn't already exist. - #[structopt(long = "db", parse(from_os_str))] - database: PathBuf, - - /// The maximum size the database can take on disk. It is recommended to specify - /// the whole disk space (value must be a multiple of a page size). - #[structopt(long = "db-size", default_value = "107374182400")] // 100 GB - database_size: usize, - - #[structopt(flatten)] - indexer: IndexerOpt, - - /// Verbose mode (-v, -vv, -vvv, etc.) - #[structopt(short, long, parse(from_occurrences))] - verbose: usize, - - /// CSV file to index, if unspecified the CSV is read from standard input. - /// - /// You can also provide a ".gz" or ".gzip" CSV file, the indexer will figure out - /// how to decode and read it. - /// - /// Note that it is much faster to index from a file as when the indexer reads from stdin - /// it will dedicate a thread for that and context switches could slow down the indexing jobs. - csv_file: Option, -} - -pub fn run(opt: Opt) -> anyhow::Result<()> { - stderrlog::new() - .verbosity(opt.verbose) - .show_level(false) - .timestamp(stderrlog::Timestamp::Off) - .init()?; - - if opt.database.exists() { - bail!("Database ({}) already exists, delete it to continue.", opt.database.display()); - } - - std::fs::create_dir_all(&opt.database)?; - let env = EnvOpenOptions::new() - .map_size(opt.database_size) - .max_dbs(10) - .open(&opt.database)?; - - let index = Index::new(&env)?; - - let file_path = opt.csv_file.unwrap(); - let gzipped = file_path.extension().map_or(false, |e| e == "gz" || e == "gzip"); - let file = File::open(file_path)?; - let content = unsafe { memmap::Mmap::map(&file)? }; - - indexing::run(&env, &index, &opt.indexer, &content, gzipped, |_docid| { }) -} diff --git a/src/subcommand/mod.rs b/src/subcommand/mod.rs index c7864c565..1c4e9620d 100644 --- a/src/subcommand/mod.rs +++ b/src/subcommand/mod.rs @@ -1,4 +1,3 @@ -pub mod indexer; pub mod infos; pub mod search; pub mod serve; diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 961ac2a81..bdf601710 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -1,6 +1,7 @@ +use std::borrow::Cow; use std::collections::HashSet; use std::fs::{File, create_dir_all}; -use std::mem; +use std::{mem, io}; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; @@ -8,6 +9,7 @@ use std::sync::Arc; use std::time::Instant; use askama_warp::Template; +use flate2::read::GzDecoder; use futures::stream; use futures::{FutureExt, StreamExt}; use heed::EnvOpenOptions; @@ -20,9 +22,9 @@ use tokio::sync::broadcast; use warp::filters::ws::Message; use warp::{Filter, http::Response}; -use crate::indexing::{self, IndexerOpt}; +use crate::indexing::{self, IndexerOpt, Transform, TransformOutput}; use crate::tokenizer::{simple_tokenizer, TokenType}; -use crate::{Index, UpdateStore, SearchResult}; +use crate::{Index, UpdateStore, SearchResult, AvailableDocumentsIds}; #[derive(Debug, StructOpt)] /// The HTTP main server of the milli project. @@ -103,9 +105,7 @@ enum UpdateStatus { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] enum UpdateMeta { - DocumentsAddition { - total_number_of_documents: Option, - }, + DocumentsAddition, DocumentsAdditionFromPath { path: PathBuf, }, @@ -153,19 +153,63 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { update_store_path, move |update_id, meta, content| { let result = match meta { - UpdateMeta::DocumentsAddition { total_number_of_documents } => { + UpdateMeta::DocumentsAddition => { + // We must use the write transaction of the update here. + let rtxn = env_cloned.read_txn()?; + let fields_ids_map = index_cloned.fields_ids_map(&rtxn)?.unwrap_or_default(); + let documents_ids = index_cloned.documents_ids(&rtxn)?.unwrap_or_default(); + let available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); + let users_ids_documents_ids = match index_cloned.users_ids_documents_ids(&rtxn).unwrap() { + Some(map) => map.map_data(Cow::Borrowed).unwrap(), + None => fst::Map::default().map_data(Cow::Owned).unwrap(), + }; + + let transform = Transform { + fields_ids_map, + available_documents_ids, + users_ids_documents_ids, + chunk_compression_type: indexer_opt_cloned.chunk_compression_type, + chunk_compression_level: indexer_opt_cloned.chunk_compression_level, + chunk_fusing_shrink_size: Some(indexer_opt_cloned.chunk_fusing_shrink_size), + max_nb_chunks: indexer_opt_cloned.max_nb_chunks, + max_memory: Some(indexer_opt_cloned.max_memory), + }; + let gzipped = false; + let reader = if gzipped { + Box::new(GzDecoder::new(content)) + } else { + Box::new(content) as Box + }; + + let TransformOutput { + fields_ids_map, + users_ids_documents_ids, + new_documents_ids, + replaced_documents_ids, + documents_count, + documents_file, + } = transform.from_csv(reader).unwrap(); + + drop(rtxn); + + let mmap = unsafe { memmap::Mmap::map(&documents_file)? }; + let documents = grenad::Reader::new(mmap.as_ref()).unwrap(); + indexing::run( &env_cloned, &index_cloned, &indexer_opt_cloned, - content, - gzipped, - |count| { + fields_ids_map, + users_ids_documents_ids, + new_documents_ids, + documents, + documents_count as u32, + |count, total| { // We send progress status... let meta = UpdateMetaProgress::DocumentsAddition { processed_number_of_documents: count as usize, - total_number_of_documents, + total_number_of_documents: Some(total as usize), }; let progress = UpdateStatus::Progressing { update_id, meta }; let _ = update_status_sender_cloned.send(progress); @@ -173,38 +217,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { ) }, UpdateMeta::DocumentsAdditionFromPath { path } => { - let file = match File::open(&path) { - Ok(file) => file, - Err(e) => { - let meta = format!("documents addition file ({}) error: {}", path.display(), e); - return Ok(meta); - } - }; - let content = match unsafe { memmap::Mmap::map(&file) } { - Ok(mmap) => mmap, - Err(e) => { - let meta = format!("documents addition file ({}) mmap error: {}", path.display(), e); - return Ok(meta); - }, - }; - - let gzipped = path.extension().map_or(false, |e| e == "gz" || e == "gzip"); - indexing::run( - &env_cloned, - &index_cloned, - &indexer_opt_cloned, - &content, - gzipped, - |count| { - // We send progress status... - let meta = UpdateMetaProgress::DocumentsAddition { - processed_number_of_documents: count as usize, - total_number_of_documents: None, - }; - let progress = UpdateStatus::Progressing { update_id, meta }; - let _ = update_status_sender_cloned.send(progress); - }, - ) + todo!() } }; @@ -388,7 +401,8 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let mut record = record.iter() .map(|(key_id, value)| { let key = fields_ids_map.name(key_id).unwrap().to_owned(); - let value = std::str::from_utf8(value).unwrap().to_owned(); + // TODO we must deserialize a Json Value and highlight it. + let value = serde_json::from_slice(value).unwrap(); (key, value) }) .collect(); @@ -423,7 +437,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let file = file.into_std().await; let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; - let meta = UpdateMeta::DocumentsAddition { total_number_of_documents: None }; + let meta = UpdateMeta::DocumentsAddition; let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id);