mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-02 09:35:45 +01:00
Finalize the LMDB indexing design
This commit is contained in:
parent
2ae3f40971
commit
7e7440c431
@ -2,22 +2,25 @@ use std::collections::hash_map::Entry;
|
|||||||
use std::collections::{HashMap, BTreeSet};
|
use std::collections::{HashMap, BTreeSet};
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
use std::hash::{Hash, BuildHasher};
|
use std::hash::{Hash, BuildHasher};
|
||||||
use std::io;
|
use std::{cmp, io};
|
||||||
use std::path::PathBuf;
|
use std::iter::FromIterator;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::{ensure, Context};
|
||||||
use cow_utils::CowUtils;
|
use fst::{Streamer, set::OpBuilder};
|
||||||
use fst::Streamer;
|
|
||||||
use heed::EnvOpenOptions;
|
|
||||||
use heed::types::*;
|
use heed::types::*;
|
||||||
|
use heed::{Env, EnvOpenOptions};
|
||||||
|
use rayon::prelude::*;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use slice_group_by::StrGroupBy;
|
use slice_group_by::StrGroupBy;
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
use mega_mini_indexer::cache::ArcCache;
|
use mega_mini_indexer::cache::ArcCache;
|
||||||
use mega_mini_indexer::{BEU32, Index, DocumentId, FastMap4};
|
use mega_mini_indexer::{BEU32, Index, DocumentId, FastMap4};
|
||||||
|
|
||||||
|
const ONE_MILLION: u32 = 1_000_000;
|
||||||
const MAX_POSITION: usize = 1000;
|
const MAX_POSITION: usize = 1000;
|
||||||
const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION;
|
const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION;
|
||||||
|
|
||||||
@ -25,8 +28,6 @@ const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION;
|
|||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
||||||
|
|
||||||
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0); // AtomicU32 ?
|
|
||||||
|
|
||||||
pub fn simple_alphanumeric_tokens(string: &str) -> impl Iterator<Item = &str> {
|
pub fn simple_alphanumeric_tokens(string: &str) -> impl Iterator<Item = &str> {
|
||||||
let is_alphanumeric = |s: &&str| s.chars().next().map_or(false, char::is_alphanumeric);
|
let is_alphanumeric = |s: &&str| s.chars().next().map_or(false, char::is_alphanumeric);
|
||||||
string.linear_group_by_key(|c| c.is_alphanumeric()).filter(is_alphanumeric)
|
string.linear_group_by_key(|c| c.is_alphanumeric()).filter(is_alphanumeric)
|
||||||
@ -95,12 +96,19 @@ fn index_csv<R: io::Read>(
|
|||||||
writer.write_byte_record(headers.as_byte_record())?;
|
writer.write_byte_record(headers.as_byte_record())?;
|
||||||
let headers = writer.into_inner()?;
|
let headers = writer.into_inner()?;
|
||||||
|
|
||||||
|
let mut document_id = 0usize;
|
||||||
|
let mut before = Instant::now();
|
||||||
let mut document = csv::StringRecord::new();
|
let mut document = csv::StringRecord::new();
|
||||||
|
|
||||||
while rdr.read_record(&mut document)? {
|
while rdr.read_record(&mut document)? {
|
||||||
let document_id = ID_GENERATOR.fetch_add(1, Ordering::SeqCst);
|
document_id = document_id + 1;
|
||||||
let document_id = DocumentId::try_from(document_id).context("Generated id is too big")?;
|
let document_id = DocumentId::try_from(document_id).context("Generated id is too big")?;
|
||||||
|
|
||||||
|
if thread_index == 0 && document_id % ONE_MILLION == 0 {
|
||||||
|
eprintln!("Document {}m just processed ({:.02?} elapsed).", document_id / ONE_MILLION, before.elapsed());
|
||||||
|
before = Instant::now();
|
||||||
|
}
|
||||||
|
|
||||||
for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) {
|
for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) {
|
||||||
for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) {
|
for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) {
|
||||||
if !word.is_empty() && word.len() < 500 { // LMDB limits
|
if !word.is_empty() && word.len() < 500 { // LMDB limits
|
||||||
@ -142,12 +150,14 @@ fn index_csv<R: io::Read>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if thread_index == 0 {
|
||||||
// We write the document in the database.
|
// We write the document in the database.
|
||||||
let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new());
|
let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new());
|
||||||
writer.write_byte_record(document.as_byte_record())?;
|
writer.write_byte_record(document.as_byte_record())?;
|
||||||
let document = writer.into_inner()?;
|
let document = writer.into_inner()?;
|
||||||
index.documents.put(wtxn, &BEU32::new(document_id), &document)?;
|
index.documents.put(wtxn, &BEU32::new(document_id), &document)?;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
put_evicted_into_heed(wtxn, index, words_cache)?;
|
put_evicted_into_heed(wtxn, index, words_cache)?;
|
||||||
|
|
||||||
@ -207,37 +217,134 @@ fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> any
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> anyhow::Result<()> {
|
fn merge_databases(
|
||||||
let opt = Opt::from_args();
|
others: Vec<(usize, Option<TempDir>, Env, Index)>,
|
||||||
|
wtxn: &mut heed::RwTxn,
|
||||||
|
index: &Index,
|
||||||
|
) -> anyhow::Result<()>
|
||||||
|
{
|
||||||
|
eprintln!("Merging the temporary databases...");
|
||||||
|
|
||||||
std::fs::create_dir_all(&opt.database)?;
|
let mut fsts = Vec::new();
|
||||||
|
for (_i, _dir, env, oindex) in others {
|
||||||
|
let rtxn = env.read_txn()?;
|
||||||
|
|
||||||
|
// merge and check the headers are equal
|
||||||
|
let headers = oindex.headers(&rtxn)?.context("A database is missing the headers")?;
|
||||||
|
match index.headers(wtxn)? {
|
||||||
|
Some(h) => ensure!(h == headers, "headers are not equal"),
|
||||||
|
None => index.put_headers(wtxn, &headers)?,
|
||||||
|
};
|
||||||
|
|
||||||
|
// retrieve the FSTs to merge them together in one run.
|
||||||
|
let fst = oindex.fst(&rtxn)?.context("A database is missing its FST")?;
|
||||||
|
let fst = fst.map_data(|s| s.to_vec())?;
|
||||||
|
fsts.push(fst);
|
||||||
|
|
||||||
|
// merge the words positions
|
||||||
|
for result in oindex.word_positions.iter(&rtxn)? {
|
||||||
|
let (word, pos) = result?;
|
||||||
|
index.word_positions.put(wtxn, word, &pos)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge the documents ids by word and position
|
||||||
|
for result in oindex.word_position_docids.iter(&rtxn)? {
|
||||||
|
let (key, docids) = result?;
|
||||||
|
index.word_position_docids.put(wtxn, key, &docids)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge the documents ids by word and attribute
|
||||||
|
for result in oindex.word_attribute_docids.iter(&rtxn)? {
|
||||||
|
let (key, docids) = result?;
|
||||||
|
index.word_attribute_docids.put(wtxn, key, &docids)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
for result in oindex.documents.iter(&rtxn)? {
|
||||||
|
let (id, content) = result?;
|
||||||
|
index.documents.put(wtxn, &id, &content)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge all the FSTs to create a final one and write it in the final database.
|
||||||
|
if let Some(fst) = index.fst(wtxn)? {
|
||||||
|
let fst = fst.map_data(|s| s.to_vec())?;
|
||||||
|
fsts.push(fst);
|
||||||
|
}
|
||||||
|
|
||||||
|
let builder = OpBuilder::from_iter(&fsts);
|
||||||
|
let op = builder.r#union();
|
||||||
|
let mut builder = fst::set::SetBuilder::memory();
|
||||||
|
builder.extend_stream(op)?;
|
||||||
|
let fst = builder.into_set();
|
||||||
|
|
||||||
|
index.put_fst(wtxn, &fst)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_env_index(path: impl AsRef<Path>) -> anyhow::Result<(Env, Index)> {
|
||||||
let env = EnvOpenOptions::new()
|
let env = EnvOpenOptions::new()
|
||||||
.map_size(100 * 1024 * 1024 * 1024) // 100 GB
|
.map_size(100 * 1024 * 1024 * 1024) // 100 GB
|
||||||
.max_readers(10)
|
.max_readers(10)
|
||||||
.max_dbs(10)
|
.max_dbs(10)
|
||||||
.open(opt.database)?;
|
.open(path)?;
|
||||||
|
|
||||||
let index = Index::new(&env)?;
|
let index = Index::new(&env)?;
|
||||||
|
|
||||||
let mut wtxn = env.write_txn()?;
|
Ok((env, index))
|
||||||
|
}
|
||||||
|
|
||||||
match opt.csv_file {
|
fn main() -> anyhow::Result<()> {
|
||||||
|
let opt = Opt::from_args();
|
||||||
|
std::fs::create_dir_all(&opt.database)?;
|
||||||
|
|
||||||
|
match &opt.csv_file {
|
||||||
Some(path) => {
|
Some(path) => {
|
||||||
let rdr = csv::Reader::from_path(path)?;
|
let num_threads = rayon::current_num_threads();
|
||||||
index_csv(&mut wtxn, rdr, &index, 1, 0)?;
|
|
||||||
},
|
let result: Result<Vec<_>, anyhow::Error> =
|
||||||
None => {
|
(0..num_threads).into_par_iter().map(|i| {
|
||||||
let rdr = csv::Reader::from_reader(io::stdin());
|
let (dir, env, index) = if i == 0 {
|
||||||
index_csv(&mut wtxn, rdr, &index, 1, 0)?;
|
let (env, index) = open_env_index(&opt.database)?;
|
||||||
}
|
(None, env, index)
|
||||||
|
} else {
|
||||||
|
let dir = tempfile::tempdir()?;
|
||||||
|
let (env, index) = open_env_index(&dir)?;
|
||||||
|
(Some(dir), env, index)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut wtxn = env.write_txn()?;
|
||||||
|
let rdr = csv::Reader::from_path(path)?;
|
||||||
|
index_csv(&mut wtxn, rdr, &index, num_threads, i)?;
|
||||||
|
|
||||||
|
wtxn.commit()?;
|
||||||
|
|
||||||
|
Ok((i, dir, env, index))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut parts = result?;
|
||||||
|
parts.sort_unstable_by_key(|&(i, ..)| cmp::Reverse(i));
|
||||||
|
|
||||||
|
let (_, _, env, index) = parts.pop().context("missing base database")?;
|
||||||
|
|
||||||
|
// TODO we can merge databases that are ready to be merged
|
||||||
|
// into the final one, without having to wait for all of them.
|
||||||
|
// TODO we can reuse an already existing database instead of creating a new one
|
||||||
|
// it would be even better to use the first one as it contains the documents.
|
||||||
|
let mut wtxn = env.write_txn()?;
|
||||||
|
merge_databases(parts, &mut wtxn, &index)?;
|
||||||
|
|
||||||
compute_words_attributes_docids(&mut wtxn, &index)?;
|
compute_words_attributes_docids(&mut wtxn, &index)?;
|
||||||
let count = index.documents.len(&wtxn)?;
|
let count = index.documents.len(&wtxn)?;
|
||||||
|
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
|
|
||||||
eprintln!("Wrote {} documents into LMDB", count);
|
eprintln!("Wrote {} documents into LMDB", count);
|
||||||
|
},
|
||||||
|
None => todo!("support for stdin CSV while indexing in parallel"),
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user