mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-30 00:34:26 +01:00
Introduce the RwIter append heed API
This commit is contained in:
parent
7e7440c431
commit
46ced5c828
9
Cargo.lock
generated
9
Cargo.lock
generated
@ -543,8 +543,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "heed"
|
name = "heed"
|
||||||
version = "0.8.0"
|
version = "0.8.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/Kerollmops/heed?branch=iter-append#e2d49e06d14771c955847fce8898d8fb1cc16c0b"
|
||||||
checksum = "fd7882b766b4be1b90d8ce5ce4c7aca2539b43176a708dbc8e79576dbbdbba93"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"heed-traits",
|
"heed-traits",
|
||||||
@ -560,14 +559,12 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "heed-traits"
|
name = "heed-traits"
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/Kerollmops/heed?branch=iter-append#e2d49e06d14771c955847fce8898d8fb1cc16c0b"
|
||||||
checksum = "b328f6260a7e51bdb0ca6b68e6ea27ee3d11fba5dee930896ee7ff6ad5fc072c"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "heed-types"
|
name = "heed-types"
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/Kerollmops/heed?branch=iter-append#e2d49e06d14771c955847fce8898d8fb1cc16c0b"
|
||||||
checksum = "e100387815256b00dbb4f48db990f7fa03e9b88b4a89c2a1661b7d9d77b77c46"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bincode",
|
"bincode",
|
||||||
"heed-traits",
|
"heed-traits",
|
||||||
|
@ -13,7 +13,8 @@ cow-utils = "0.1.2"
|
|||||||
csv = "1.1.3"
|
csv = "1.1.3"
|
||||||
fst = "0.4.3"
|
fst = "0.4.3"
|
||||||
fxhash = "0.2.1"
|
fxhash = "0.2.1"
|
||||||
heed = { version = "0.8.0", default-features = false, features = ["lmdb"] }
|
# heed = { version = "0.8.0", default-features = false, features = ["lmdb"] }
|
||||||
|
heed = { git = "https://github.com/Kerollmops/heed", branch = "iter-append", default-features = false, features = ["lmdb"] }
|
||||||
jemallocator = "0.3.2"
|
jemallocator = "0.3.2"
|
||||||
levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] }
|
levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] }
|
||||||
linked-hash-map = "0.5.3"
|
linked-hash-map = "0.5.3"
|
||||||
|
@ -2,7 +2,7 @@ use std::collections::hash_map::Entry;
|
|||||||
use std::collections::{HashMap, BTreeSet};
|
use std::collections::{HashMap, BTreeSet};
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
use std::hash::{Hash, BuildHasher};
|
use std::hash::{Hash, BuildHasher};
|
||||||
use std::{cmp, io};
|
use std::io;
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
@ -42,7 +42,7 @@ struct Opt {
|
|||||||
database: PathBuf,
|
database: PathBuf,
|
||||||
|
|
||||||
/// CSV file to index.
|
/// CSV file to index.
|
||||||
csv_file: Option<PathBuf>,
|
csv_file: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_evicted_into_heed<I>(wtxn: &mut heed::RwTxn, index: &Index, iter: I) -> anyhow::Result<()>
|
fn put_evicted_into_heed<I>(wtxn: &mut heed::RwTxn, index: &Index, iter: I) -> anyhow::Result<()>
|
||||||
@ -150,7 +150,7 @@ fn index_csv<R: io::Read>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if thread_index == 0 {
|
if document_id as usize % num_threads == thread_index {
|
||||||
// We write the document in the database.
|
// We write the document in the database.
|
||||||
let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new());
|
let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new());
|
||||||
writer.write_byte_record(document.as_byte_record())?;
|
writer.write_byte_record(document.as_byte_record())?;
|
||||||
@ -217,18 +217,174 @@ fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> any
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use std::collections::binary_heap::{BinaryHeap, PeekMut};
|
||||||
|
use std::cmp::{Ordering, Reverse};
|
||||||
|
|
||||||
|
// ------------ Value
|
||||||
|
|
||||||
|
struct Value<'t, KC, DC>
|
||||||
|
where
|
||||||
|
KC: heed::BytesDecode<'t>,
|
||||||
|
DC: heed::BytesDecode<'t>,
|
||||||
|
{
|
||||||
|
iter: heed::RoIter<'t, KC, DC>,
|
||||||
|
value: Option<heed::Result<(KC::DItem, DC::DItem)>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'t, KC, DC> Value<'t, KC, DC>
|
||||||
|
where
|
||||||
|
KC: heed::BytesDecode<'t>,
|
||||||
|
DC: heed::BytesDecode<'t>,
|
||||||
|
{
|
||||||
|
fn new(mut iter: heed::RoIter<'t, KC, DC>) -> Option<Value<'t, KC, DC>> {
|
||||||
|
iter.next().map(|value| Value { iter, value: Some(value) })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn peek_value(&mut self) -> Option<heed::Result<(KC::DItem, DC::DItem)>> {
|
||||||
|
std::mem::replace(&mut self.value, self.iter.next())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'t, KC, DC> Ord for Value<'t, KC, DC>
|
||||||
|
where
|
||||||
|
KC: heed::BytesDecode<'t>,
|
||||||
|
DC: heed::BytesDecode<'t>,
|
||||||
|
KC::DItem: Ord,
|
||||||
|
{
|
||||||
|
fn cmp(&self, other: &Self) -> Ordering {
|
||||||
|
let a = self.value.as_ref().unwrap();
|
||||||
|
let b = other.value.as_ref().unwrap();
|
||||||
|
match (a, b) {
|
||||||
|
(Ok((a, _)), Ok((b, _))) => a.cmp(&b),
|
||||||
|
(Err(_), Err(_)) => Ordering::Equal,
|
||||||
|
(Err(_), _) => Ordering::Less,
|
||||||
|
(_, Err(_)) => Ordering::Greater,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'t, KC, DC> Eq for Value<'t, KC, DC>
|
||||||
|
where
|
||||||
|
KC: heed::BytesDecode<'t>,
|
||||||
|
DC: heed::BytesDecode<'t>,
|
||||||
|
KC::DItem: Ord,
|
||||||
|
{ }
|
||||||
|
|
||||||
|
impl<'t, KC, DC> PartialEq for Value<'t, KC, DC>
|
||||||
|
where
|
||||||
|
KC: heed::BytesDecode<'t>,
|
||||||
|
DC: heed::BytesDecode<'t>,
|
||||||
|
KC::DItem: Ord,
|
||||||
|
{
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.cmp(other) == Ordering::Equal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'t, KC, DC> PartialOrd for Value<'t, KC, DC>
|
||||||
|
where
|
||||||
|
KC: heed::BytesDecode<'t>,
|
||||||
|
DC: heed::BytesDecode<'t>,
|
||||||
|
KC::DItem: Ord,
|
||||||
|
{
|
||||||
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||||
|
Some(self.cmp(other))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------
|
||||||
|
|
||||||
|
struct MergeIter<'t, KC, DC>
|
||||||
|
where
|
||||||
|
KC: heed::BytesDecode<'t>,
|
||||||
|
DC: heed::BytesDecode<'t>,
|
||||||
|
{
|
||||||
|
iters: BinaryHeap<Reverse<Value<'t, KC, DC>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'t, KC, DC> MergeIter<'t, KC, DC>
|
||||||
|
where
|
||||||
|
KC: heed::BytesDecode<'t>,
|
||||||
|
DC: heed::BytesDecode<'t>,
|
||||||
|
KC::DItem: Ord,
|
||||||
|
{
|
||||||
|
fn new(iters: Vec<heed::RoIter<'t, KC, DC>>) -> MergeIter<'t, KC, DC> {
|
||||||
|
let iters = iters.into_iter().filter_map(Value::new).map(Reverse).collect();
|
||||||
|
MergeIter { iters }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'t, KC, DC> Iterator for MergeIter<'t, KC, DC>
|
||||||
|
where
|
||||||
|
KC: heed::BytesDecode<'t>,
|
||||||
|
DC: heed::BytesDecode<'t>,
|
||||||
|
KC::DItem: Ord,
|
||||||
|
{
|
||||||
|
type Item = heed::Result<(KC::DItem, DC::DItem)>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
let mut peek = self.iters.peek_mut()?;
|
||||||
|
let result = peek.0.peek_value().unwrap();
|
||||||
|
|
||||||
|
if peek.0.value.is_none() {
|
||||||
|
PeekMut::pop(peek);
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn merge_databases(
|
fn merge_databases(
|
||||||
others: Vec<(usize, Option<TempDir>, Env, Index)>,
|
others: Vec<(TempDir, Env, Index)>,
|
||||||
wtxn: &mut heed::RwTxn,
|
wtxn: &mut heed::RwTxn,
|
||||||
index: &Index,
|
index: &Index,
|
||||||
) -> anyhow::Result<()>
|
) -> anyhow::Result<()>
|
||||||
{
|
{
|
||||||
eprintln!("Merging the temporary databases...");
|
eprintln!("Merging the temporary databases...");
|
||||||
|
|
||||||
let mut fsts = Vec::new();
|
let rtxns: Result<Vec<_>, _> = others.iter().map(|(_, env, _)| env.read_txn()).collect();
|
||||||
for (_i, _dir, env, oindex) in others {
|
let rtxns = rtxns?;
|
||||||
let rtxn = env.read_txn()?;
|
|
||||||
|
|
||||||
|
// merge the word positions
|
||||||
|
let sources: Result<Vec<_>, _> = others.iter().zip(&rtxns).map(|((.., i), t)| i.word_positions.iter(t)).collect();
|
||||||
|
let sources = sources?;
|
||||||
|
let mut dest = index.word_positions.iter_mut(wtxn)?;
|
||||||
|
let before = Instant::now();
|
||||||
|
for result in MergeIter::new(sources) {
|
||||||
|
let (k, v) = result?;
|
||||||
|
dest.append(&k, &v)?;
|
||||||
|
}
|
||||||
|
eprintln!("Merging the word_positions database took {:.02?}.", before.elapsed());
|
||||||
|
drop(dest);
|
||||||
|
|
||||||
|
// merge the word position documents ids
|
||||||
|
let sources: Result<Vec<_>, _> = others.iter().zip(&rtxns).map(|((.., i), t)| i.word_position_docids.iter(t)).collect();
|
||||||
|
let sources = sources?;
|
||||||
|
let mut dest = index.word_position_docids.iter_mut(wtxn)?;
|
||||||
|
let before = Instant::now();
|
||||||
|
for result in MergeIter::new(sources) {
|
||||||
|
let (k, v) = result?;
|
||||||
|
dest.append(&k, &v)?;
|
||||||
|
}
|
||||||
|
eprintln!("Merging the word_position_docids database took {:.02?}.", before.elapsed());
|
||||||
|
drop(dest);
|
||||||
|
|
||||||
|
// merge the documents
|
||||||
|
let sources: Result<Vec<_>, _> = others.iter().zip(&rtxns).map(|((.., i), t)| {
|
||||||
|
i.documents.as_polymorph().iter::<_, ByteSlice, ByteSlice>(t)
|
||||||
|
}).collect();
|
||||||
|
let sources = sources?;
|
||||||
|
let mut dest = index.documents.as_polymorph().iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
|
||||||
|
let before = Instant::now();
|
||||||
|
for result in MergeIter::new(sources) {
|
||||||
|
let (k, v) = result?;
|
||||||
|
dest.append(&k, &v)?;
|
||||||
|
}
|
||||||
|
eprintln!("Merging the documents database took {:.02?}.", before.elapsed());
|
||||||
|
drop(dest);
|
||||||
|
|
||||||
|
let mut fsts = Vec::new();
|
||||||
|
for ((_dir, _env, oindex), rtxn) in others.into_iter().zip(&rtxns) {
|
||||||
// merge and check the headers are equal
|
// merge and check the headers are equal
|
||||||
let headers = oindex.headers(&rtxn)?.context("A database is missing the headers")?;
|
let headers = oindex.headers(&rtxn)?.context("A database is missing the headers")?;
|
||||||
match index.headers(wtxn)? {
|
match index.headers(wtxn)? {
|
||||||
@ -240,30 +396,9 @@ fn merge_databases(
|
|||||||
let fst = oindex.fst(&rtxn)?.context("A database is missing its FST")?;
|
let fst = oindex.fst(&rtxn)?.context("A database is missing its FST")?;
|
||||||
let fst = fst.map_data(|s| s.to_vec())?;
|
let fst = fst.map_data(|s| s.to_vec())?;
|
||||||
fsts.push(fst);
|
fsts.push(fst);
|
||||||
|
|
||||||
// merge the words positions
|
|
||||||
for result in oindex.word_positions.iter(&rtxn)? {
|
|
||||||
let (word, pos) = result?;
|
|
||||||
index.word_positions.put(wtxn, word, &pos)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// merge the documents ids by word and position
|
let before = Instant::now();
|
||||||
for result in oindex.word_position_docids.iter(&rtxn)? {
|
|
||||||
let (key, docids) = result?;
|
|
||||||
index.word_position_docids.put(wtxn, key, &docids)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// merge the documents ids by word and attribute
|
|
||||||
for result in oindex.word_attribute_docids.iter(&rtxn)? {
|
|
||||||
let (key, docids) = result?;
|
|
||||||
index.word_attribute_docids.put(wtxn, key, &docids)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
for result in oindex.documents.iter(&rtxn)? {
|
|
||||||
let (id, content) = result?;
|
|
||||||
index.documents.put(wtxn, &id, &content)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge all the FSTs to create a final one and write it in the final database.
|
// Merge all the FSTs to create a final one and write it in the final database.
|
||||||
if let Some(fst) = index.fst(wtxn)? {
|
if let Some(fst) = index.fst(wtxn)? {
|
||||||
@ -279,6 +414,8 @@ fn merge_databases(
|
|||||||
|
|
||||||
index.put_fst(wtxn, &fst)?;
|
index.put_fst(wtxn, &fst)?;
|
||||||
|
|
||||||
|
eprintln!("Merging the FSTs took {:.02?}.", before.elapsed());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,55 +433,36 @@ fn open_env_index(path: impl AsRef<Path>) -> anyhow::Result<(Env, Index)> {
|
|||||||
|
|
||||||
fn main() -> anyhow::Result<()> {
|
fn main() -> anyhow::Result<()> {
|
||||||
let opt = Opt::from_args();
|
let opt = Opt::from_args();
|
||||||
std::fs::create_dir_all(&opt.database)?;
|
|
||||||
|
|
||||||
match &opt.csv_file {
|
std::fs::create_dir_all(&opt.database)?;
|
||||||
Some(path) => {
|
let (env, index) = open_env_index(&opt.database)?;
|
||||||
|
|
||||||
let num_threads = rayon::current_num_threads();
|
let num_threads = rayon::current_num_threads();
|
||||||
|
|
||||||
let result: Result<Vec<_>, anyhow::Error> =
|
let result: anyhow::Result<_> =
|
||||||
(0..num_threads).into_par_iter().map(|i| {
|
(0..num_threads).into_par_iter().map(|i| {
|
||||||
let (dir, env, index) = if i == 0 {
|
|
||||||
let (env, index) = open_env_index(&opt.database)?;
|
|
||||||
(None, env, index)
|
|
||||||
} else {
|
|
||||||
let dir = tempfile::tempdir()?;
|
let dir = tempfile::tempdir()?;
|
||||||
let (env, index) = open_env_index(&dir)?;
|
let (env, index) = open_env_index(&dir)?;
|
||||||
(Some(dir), env, index)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut wtxn = env.write_txn()?;
|
let mut wtxn = env.write_txn()?;
|
||||||
let rdr = csv::Reader::from_path(path)?;
|
let rdr = csv::Reader::from_path(&opt.csv_file)?;
|
||||||
index_csv(&mut wtxn, rdr, &index, num_threads, i)?;
|
index_csv(&mut wtxn, rdr, &index, num_threads, i)?;
|
||||||
|
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
|
|
||||||
Ok((i, dir, env, index))
|
Ok((dir, env, index))
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let mut parts = result?;
|
|
||||||
parts.sort_unstable_by_key(|&(i, ..)| cmp::Reverse(i));
|
|
||||||
|
|
||||||
let (_, _, env, index) = parts.pop().context("missing base database")?;
|
|
||||||
|
|
||||||
// TODO we can merge databases that are ready to be merged
|
|
||||||
// into the final one, without having to wait for all of them.
|
|
||||||
// TODO we can reuse an already existing database instead of creating a new one
|
|
||||||
// it would be even better to use the first one as it contains the documents.
|
|
||||||
let mut wtxn = env.write_txn()?;
|
let mut wtxn = env.write_txn()?;
|
||||||
|
let parts = result?;
|
||||||
merge_databases(parts, &mut wtxn, &index)?;
|
merge_databases(parts, &mut wtxn, &index)?;
|
||||||
|
|
||||||
compute_words_attributes_docids(&mut wtxn, &index)?;
|
compute_words_attributes_docids(&mut wtxn, &index)?;
|
||||||
let count = index.documents.len(&wtxn)?;
|
let count = index.documents.len(&wtxn)?;
|
||||||
|
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
|
|
||||||
eprintln!("Wrote {} documents into LMDB", count);
|
eprintln!("Wrote {} documents into LMDB", count);
|
||||||
},
|
|
||||||
None => todo!("support for stdin CSV while indexing in parallel"),
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user