mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
Move from oxidized-mtbl to grenad
This commit is contained in:
parent
b342a86c15
commit
f980422c57
3 changed files with 130 additions and 131 deletions
|
@ -1,7 +1,7 @@
|
|||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::convert::TryFrom;
|
||||
use std::fs::File;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::io::{self, Read, Write, Seek, SeekFrom};
|
||||
use std::iter::FromIterator;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::mpsc::sync_channel;
|
||||
|
@ -16,8 +16,7 @@ use fst::IntoStreamer;
|
|||
use heed::{EnvOpenOptions, BytesEncode, types::ByteSlice};
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use log::{debug, info};
|
||||
use memmap::Mmap;
|
||||
use oxidized_mtbl::{Reader, Writer, Merger, Sorter, CompressionType};
|
||||
use grenad::{Reader, FileFuse, Writer, Merger, Sorter, CompressionType};
|
||||
use rayon::prelude::*;
|
||||
use roaring::RoaringBitmap;
|
||||
use structopt::StructOpt;
|
||||
|
@ -110,6 +109,9 @@ struct IndexerOpt {
|
|||
/// The level of compression of the chosen algorithm.
|
||||
#[structopt(long, requires = "chunk-compression-type")]
|
||||
chunk_compression_level: Option<u32>,
|
||||
|
||||
#[structopt(long, default_value = "4294967296")] // 4 GB
|
||||
file_fusing_shrink_size: u64,
|
||||
}
|
||||
|
||||
fn format_count(n: usize) -> String {
|
||||
|
@ -120,7 +122,7 @@ fn lmdb_key_valid_size(key: &[u8]) -> bool {
|
|||
!key.is_empty() && key.len() <= LMDB_MAX_KEY_LENGTH
|
||||
}
|
||||
|
||||
fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> Writer<File> {
|
||||
fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> {
|
||||
let mut builder = Writer::builder();
|
||||
builder.compression_type(typ);
|
||||
if let Some(level) = level {
|
||||
|
@ -129,21 +131,24 @@ fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> Writer
|
|||
builder.build(file)
|
||||
}
|
||||
|
||||
fn writer_into_reader(writer: Writer<File>) -> anyhow::Result<Reader<Mmap>> {
|
||||
let file = writer.into_inner()?;
|
||||
let mmap = unsafe { Mmap::map(&file)? };
|
||||
Reader::new(mmap).map_err(Into::into)
|
||||
fn writer_into_reader(writer: Writer<File>, shrink_size: u64) -> anyhow::Result<Reader<FileFuse>> {
|
||||
let mut file = writer.into_inner()?;
|
||||
file.seek(SeekFrom::Start(0))?;
|
||||
let file = FileFuse::with_shrink_size(file, shrink_size);
|
||||
Reader::new(file).map_err(Into::into)
|
||||
}
|
||||
|
||||
fn create_sorter(
|
||||
merge: MergeFn,
|
||||
chunk_compression_type: CompressionType,
|
||||
chunk_compression_level: Option<u32>,
|
||||
file_fusing_shrink_size: u64,
|
||||
max_nb_chunks: Option<usize>,
|
||||
max_memory: Option<usize>,
|
||||
) -> Sorter<MergeFn>
|
||||
{
|
||||
let mut builder = Sorter::builder(merge);
|
||||
builder.file_fusing_shrink_size(file_fusing_shrink_size);
|
||||
builder.chunk_compression_type(chunk_compression_type);
|
||||
if let Some(level) = chunk_compression_level {
|
||||
builder.chunk_compression_level(level);
|
||||
|
@ -194,11 +199,11 @@ fn compute_words_pair_proximities(
|
|||
type MergeFn = fn(&[u8], &[Vec<u8>]) -> Result<Vec<u8>, ()>;
|
||||
|
||||
struct Readers {
|
||||
main: Reader<Mmap>,
|
||||
word_docids: Reader<Mmap>,
|
||||
docid_word_positions: Reader<Mmap>,
|
||||
words_pairs_proximities_docids: Reader<Mmap>,
|
||||
documents: Reader<Mmap>,
|
||||
main: Reader<FileFuse>,
|
||||
word_docids: Reader<FileFuse>,
|
||||
docid_word_positions: Reader<FileFuse>,
|
||||
words_pairs_proximities_docids: Reader<FileFuse>,
|
||||
documents: Reader<FileFuse>,
|
||||
}
|
||||
|
||||
struct Store {
|
||||
|
@ -210,6 +215,7 @@ struct Store {
|
|||
// MTBL parameters
|
||||
chunk_compression_type: CompressionType,
|
||||
chunk_compression_level: Option<u32>,
|
||||
file_fusing_shrink_size: u64,
|
||||
// MTBL sorters
|
||||
main_sorter: Sorter<MergeFn>,
|
||||
word_docids_sorter: Sorter<MergeFn>,
|
||||
|
@ -226,6 +232,7 @@ impl Store {
|
|||
max_memory: Option<usize>,
|
||||
chunk_compression_type: CompressionType,
|
||||
chunk_compression_level: Option<u32>,
|
||||
file_fusing_shrink_size: u64,
|
||||
) -> anyhow::Result<Store>
|
||||
{
|
||||
// We divide the max memory by the number of sorter the Store have.
|
||||
|
@ -235,6 +242,7 @@ impl Store {
|
|||
main_merge,
|
||||
chunk_compression_type,
|
||||
chunk_compression_level,
|
||||
file_fusing_shrink_size,
|
||||
max_nb_chunks,
|
||||
max_memory,
|
||||
);
|
||||
|
@ -242,6 +250,7 @@ impl Store {
|
|||
word_docids_merge,
|
||||
chunk_compression_type,
|
||||
chunk_compression_level,
|
||||
file_fusing_shrink_size,
|
||||
max_nb_chunks,
|
||||
max_memory,
|
||||
);
|
||||
|
@ -249,14 +258,15 @@ impl Store {
|
|||
words_pairs_proximities_docids_merge,
|
||||
chunk_compression_type,
|
||||
chunk_compression_level,
|
||||
file_fusing_shrink_size,
|
||||
max_nb_chunks,
|
||||
max_memory,
|
||||
);
|
||||
|
||||
let documents_writer = tempfile().map(|f| {
|
||||
let documents_writer = tempfile().and_then(|f| {
|
||||
create_writer(chunk_compression_type, chunk_compression_level, f)
|
||||
})?;
|
||||
let docid_word_positions_writer = tempfile().map(|f| {
|
||||
let docid_word_positions_writer = tempfile().and_then(|f| {
|
||||
create_writer(chunk_compression_type, chunk_compression_level, f)
|
||||
})?;
|
||||
|
||||
|
@ -268,6 +278,7 @@ impl Store {
|
|||
documents_ids: RoaringBitmap::new(),
|
||||
chunk_compression_type,
|
||||
chunk_compression_level,
|
||||
file_fusing_shrink_size,
|
||||
|
||||
main_sorter,
|
||||
word_docids_sorter,
|
||||
|
@ -510,6 +521,7 @@ impl Store {
|
|||
fn finish(mut self) -> anyhow::Result<Readers> {
|
||||
let comp_type = self.chunk_compression_type;
|
||||
let comp_level = self.chunk_compression_level;
|
||||
let shrink_size = self.file_fusing_shrink_size;
|
||||
|
||||
Self::write_word_docids(&mut self.word_docids_sorter, self.word_docids)?;
|
||||
Self::write_documents_ids(&mut self.main_sorter, self.documents_ids)?;
|
||||
|
@ -518,12 +530,11 @@ impl Store {
|
|||
self.words_pairs_proximities_docids,
|
||||
)?;
|
||||
|
||||
let mut word_docids_wtr = tempfile().map(|f| create_writer(comp_type, comp_level, f))?;
|
||||
let mut word_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?;
|
||||
let mut builder = fst::SetBuilder::memory();
|
||||
|
||||
let mut iter = self.word_docids_sorter.into_iter()?;
|
||||
while let Some(result) = iter.next() {
|
||||
let (word, val) = result?;
|
||||
while let Some((word, val)) = iter.next()? {
|
||||
// This is a lexicographically ordered word position
|
||||
// we use the key to construct the words fst.
|
||||
builder.insert(word)?;
|
||||
|
@ -533,17 +544,17 @@ impl Store {
|
|||
let fst = builder.into_set();
|
||||
self.main_sorter.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?;
|
||||
|
||||
let mut main_wtr = tempfile().map(|f| create_writer(comp_type, comp_level, f))?;
|
||||
let mut main_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?;
|
||||
self.main_sorter.write_into(&mut main_wtr)?;
|
||||
|
||||
let mut words_pairs_proximities_docids_wtr = tempfile().map(|f| create_writer(comp_type, comp_level, f))?;
|
||||
let mut words_pairs_proximities_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?;
|
||||
self.words_pairs_proximities_docids_sorter.write_into(&mut words_pairs_proximities_docids_wtr)?;
|
||||
|
||||
let main = writer_into_reader(main_wtr)?;
|
||||
let word_docids = writer_into_reader(word_docids_wtr)?;
|
||||
let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr)?;
|
||||
let docid_word_positions = writer_into_reader(self.docid_word_positions_writer)?;
|
||||
let documents = writer_into_reader(self.documents_writer)?;
|
||||
let main = writer_into_reader(main_wtr, shrink_size)?;
|
||||
let word_docids = writer_into_reader(word_docids_wtr, shrink_size)?;
|
||||
let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr, shrink_size)?;
|
||||
let docid_word_positions = writer_into_reader(self.docid_word_positions_writer, shrink_size)?;
|
||||
let documents = writer_into_reader(self.documents_writer, shrink_size)?;
|
||||
|
||||
Ok(Readers {
|
||||
main,
|
||||
|
@ -614,7 +625,7 @@ fn documents_merge(key: &[u8], _values: &[Vec<u8>]) -> Result<Vec<u8>, ()> {
|
|||
panic!("merging documents is an error ({:?})", key.as_bstr())
|
||||
}
|
||||
|
||||
fn merge_readers(sources: Vec<Reader<Mmap>>, merge: MergeFn) -> Merger<Mmap, MergeFn> {
|
||||
fn merge_readers(sources: Vec<Reader<FileFuse>>, merge: MergeFn) -> Merger<FileFuse, MergeFn> {
|
||||
let mut builder = Merger::builder(merge);
|
||||
builder.extend(sources);
|
||||
builder.build()
|
||||
|
@ -623,7 +634,7 @@ fn merge_readers(sources: Vec<Reader<Mmap>>, merge: MergeFn) -> Merger<Mmap, Mer
|
|||
fn merge_into_lmdb_database(
|
||||
wtxn: &mut heed::RwTxn,
|
||||
database: heed::PolyDatabase,
|
||||
sources: Vec<Reader<Mmap>>,
|
||||
sources: Vec<Reader<FileFuse>>,
|
||||
merge: MergeFn,
|
||||
) -> anyhow::Result<()> {
|
||||
debug!("Merging {} MTBL stores...", sources.len());
|
||||
|
@ -633,8 +644,7 @@ fn merge_into_lmdb_database(
|
|||
let mut in_iter = merger.into_merge_iter()?;
|
||||
|
||||
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
|
||||
while let Some(result) = in_iter.next() {
|
||||
let (k, v) = result?;
|
||||
while let Some((k, v)) = in_iter.next()? {
|
||||
out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?;
|
||||
}
|
||||
|
||||
|
@ -645,15 +655,13 @@ fn merge_into_lmdb_database(
|
|||
fn write_into_lmdb_database(
|
||||
wtxn: &mut heed::RwTxn,
|
||||
database: heed::PolyDatabase,
|
||||
reader: Reader<Mmap>,
|
||||
mut reader: Reader<FileFuse>,
|
||||
) -> anyhow::Result<()> {
|
||||
debug!("Writing MTBL stores...");
|
||||
let before = Instant::now();
|
||||
|
||||
let mut in_iter = reader.into_iter()?;
|
||||
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
|
||||
while let Some(result) = in_iter.next() {
|
||||
let (k, v) = result?;
|
||||
while let Some((k, v)) = reader.next()? {
|
||||
out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?;
|
||||
}
|
||||
|
||||
|
@ -746,6 +754,7 @@ fn main() -> anyhow::Result<()> {
|
|||
let max_memory_by_job = opt.indexer.max_memory / num_threads;
|
||||
let chunk_compression_type = opt.indexer.chunk_compression_type;
|
||||
let chunk_compression_level = opt.indexer.chunk_compression_level;
|
||||
let file_fusing_shrink_size = opt.indexer.file_fusing_shrink_size;
|
||||
let log_every_n = opt.indexer.log_every_n;
|
||||
|
||||
let readers = csv_readers(opt.csv_file, num_threads)?
|
||||
|
@ -758,6 +767,7 @@ fn main() -> anyhow::Result<()> {
|
|||
Some(max_memory_by_job),
|
||||
chunk_compression_type,
|
||||
chunk_compression_level,
|
||||
file_fusing_shrink_size,
|
||||
)?;
|
||||
store.index_csv(rdr, i, num_threads, log_every_n)
|
||||
})
|
||||
|
@ -779,12 +789,12 @@ fn main() -> anyhow::Result<()> {
|
|||
// This is the function that merge the readers
|
||||
// by using the given merge function.
|
||||
let merge_readers = move |readers, merge| {
|
||||
let mut writer = tempfile().map(|f| {
|
||||
let mut writer = tempfile().and_then(|f| {
|
||||
create_writer(chunk_compression_type, chunk_compression_level, f)
|
||||
})?;
|
||||
let merger = merge_readers(readers, merge);
|
||||
merger.write_into(&mut writer)?;
|
||||
writer_into_reader(writer)
|
||||
writer_into_reader(writer, file_fusing_shrink_size)
|
||||
};
|
||||
|
||||
// The enum and the channel which is used to transfert
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue