mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-26 06:30:05 +01:00
Move the documents into another file
This commit is contained in:
parent
fae694a102
commit
91282c8b6a
@ -1,6 +1,6 @@
|
|||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::fs::File;
|
use std::fs::{File, OpenOptions};
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@ -31,7 +31,6 @@ const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION;
|
|||||||
|
|
||||||
const HEADERS_KEY: &[u8] = b"\0headers";
|
const HEADERS_KEY: &[u8] = b"\0headers";
|
||||||
const WORDS_FST_KEY: &[u8] = b"\x05words-fst";
|
const WORDS_FST_KEY: &[u8] = b"\x05words-fst";
|
||||||
const DOCUMENTS_KEY: &[u8] = b"\x06documents";
|
|
||||||
const WORD_POSITIONS_BYTE: u8 = 1;
|
const WORD_POSITIONS_BYTE: u8 = 1;
|
||||||
const WORD_POSITION_DOCIDS_BYTE: u8 = 2;
|
const WORD_POSITION_DOCIDS_BYTE: u8 = 2;
|
||||||
const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3;
|
const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3;
|
||||||
@ -220,7 +219,7 @@ impl Store {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn finish(mut self) -> anyhow::Result<Reader<Mmap>> {
|
pub fn finish(mut self) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)> {
|
||||||
Self::write_word_positions(&mut self.sorter, self.word_positions)?;
|
Self::write_word_positions(&mut self.sorter, self.word_positions)?;
|
||||||
Self::write_word_position_docids(&mut self.sorter, self.word_position_docids)?;
|
Self::write_word_position_docids(&mut self.sorter, self.word_position_docids)?;
|
||||||
Self::write_word_attribute_docids(&mut self.sorter, self.word_attribute_docids)?;
|
Self::write_word_attribute_docids(&mut self.sorter, self.word_attribute_docids)?;
|
||||||
@ -246,13 +245,13 @@ impl Store {
|
|||||||
self.documents_sorter.write_into(&mut docs_wtr)?;
|
self.documents_sorter.write_into(&mut docs_wtr)?;
|
||||||
let docs_file = docs_wtr.into_inner()?;
|
let docs_file = docs_wtr.into_inner()?;
|
||||||
let docs_mmap = unsafe { Mmap::map(&docs_file)? };
|
let docs_mmap = unsafe { Mmap::map(&docs_file)? };
|
||||||
wtr.insert(DOCUMENTS_KEY, docs_mmap)?;
|
let docs_reader = Reader::new(docs_mmap)?;
|
||||||
|
|
||||||
let file = wtr.into_inner()?;
|
let file = wtr.into_inner()?;
|
||||||
let mmap = unsafe { Mmap::map(&file)? };
|
let mmap = unsafe { Mmap::map(&file)? };
|
||||||
let reader = Reader::new(mmap)?;
|
let reader = Reader::new(mmap)?;
|
||||||
|
|
||||||
Ok(reader)
|
Ok((reader, docs_reader))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -280,20 +279,6 @@ fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> {
|
|||||||
assert!(values.windows(2).all(|vs| vs[0] == vs[1]));
|
assert!(values.windows(2).all(|vs| vs[0] == vs[1]));
|
||||||
Ok(values[0].to_vec())
|
Ok(values[0].to_vec())
|
||||||
},
|
},
|
||||||
DOCUMENTS_KEY => {
|
|
||||||
let sources: Vec<_> = values.iter().map(Reader::new).collect::<Result<_, _>>().unwrap();
|
|
||||||
|
|
||||||
let mut builder = Merger::builder(docs_merge);
|
|
||||||
builder.extend(sources);
|
|
||||||
let merger = builder.build();
|
|
||||||
|
|
||||||
let mut builder = Writer::builder();
|
|
||||||
builder.compression_type(CompressionType::Snappy);
|
|
||||||
|
|
||||||
let mut wtr = builder.memory();
|
|
||||||
merger.write_into(&mut wtr).unwrap();
|
|
||||||
Ok(wtr.into_inner().unwrap())
|
|
||||||
},
|
|
||||||
key => match key[0] {
|
key => match key[0] {
|
||||||
WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => {
|
WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => {
|
||||||
let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap();
|
let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap();
|
||||||
@ -323,10 +308,6 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) ->
|
|||||||
// Write the headers
|
// Write the headers
|
||||||
index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?;
|
index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?;
|
||||||
}
|
}
|
||||||
else if key == DOCUMENTS_KEY {
|
|
||||||
// Write the documents
|
|
||||||
index.main.put::<_, Str, ByteSlice>(wtxn, "documents", val)?;
|
|
||||||
}
|
|
||||||
else if key.starts_with(&[WORD_POSITIONS_BYTE]) {
|
else if key.starts_with(&[WORD_POSITIONS_BYTE]) {
|
||||||
// Write the postings lists
|
// Write the postings lists
|
||||||
index.word_positions.as_polymorph()
|
index.word_positions.as_polymorph()
|
||||||
@ -373,7 +354,7 @@ fn index_csv(
|
|||||||
arc_cache_size: Option<usize>,
|
arc_cache_size: Option<usize>,
|
||||||
max_nb_chunks: Option<usize>,
|
max_nb_chunks: Option<usize>,
|
||||||
max_memory: Option<usize>,
|
max_memory: Option<usize>,
|
||||||
) -> anyhow::Result<Reader<Mmap>>
|
) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)>
|
||||||
{
|
{
|
||||||
debug!("{:?}: Indexing into a Store...", thread_index);
|
debug!("{:?}: Indexing into a Store...", thread_index);
|
||||||
|
|
||||||
@ -418,9 +399,9 @@ fn index_csv(
|
|||||||
store.write_document(document_id, &document)?;
|
store.write_document(document_id, &document)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let reader = store.finish()?;
|
let (reader, docs_reader) = store.finish()?;
|
||||||
debug!("{:?}: Store created!", thread_index);
|
debug!("{:?}: Store created!", thread_index);
|
||||||
Ok(reader)
|
Ok((reader, docs_reader))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> anyhow::Result<()> {
|
fn main() -> anyhow::Result<()> {
|
||||||
@ -441,7 +422,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
.map_size(100 * 1024 * 1024 * 1024) // 100 GB
|
.map_size(100 * 1024 * 1024 * 1024) // 100 GB
|
||||||
.max_readers(10)
|
.max_readers(10)
|
||||||
.max_dbs(10)
|
.max_dbs(10)
|
||||||
.open(opt.database)?;
|
.open(&opt.database)?;
|
||||||
|
|
||||||
let index = Index::new(&env)?;
|
let index = Index::new(&env)?;
|
||||||
|
|
||||||
@ -488,17 +469,37 @@ fn main() -> anyhow::Result<()> {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let stores = csv_readers
|
let readers = csv_readers
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(i, rdr)| index_csv(rdr, i, num_threads, arc_cache_size, max_nb_chunks, max_memory))
|
.map(|(i, rdr)| index_csv(rdr, i, num_threads, arc_cache_size, max_nb_chunks, max_memory))
|
||||||
.collect::<Result<_, _>>()?;
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
|
let mut stores = Vec::with_capacity(readers.len());
|
||||||
|
let mut docs_stores = Vec::with_capacity(readers.len());
|
||||||
|
|
||||||
|
readers.into_iter().for_each(|(s, d)| {
|
||||||
|
stores.push(s);
|
||||||
|
docs_stores.push(d);
|
||||||
|
});
|
||||||
|
|
||||||
debug!("We are writing into LMDB...");
|
debug!("We are writing into LMDB...");
|
||||||
let mut wtxn = env.write_txn()?;
|
let mut wtxn = env.write_txn()?;
|
||||||
|
|
||||||
|
// We merge the postings lists into LMDB.
|
||||||
merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?;
|
merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?;
|
||||||
let count = index.documents(&wtxn)?.unwrap().metadata().count_entries;
|
|
||||||
|
// We also merge the documents into its own MTBL store.
|
||||||
|
let path = opt.database.join("documents.mtbl");
|
||||||
|
let file = OpenOptions::new().create(true).truncate(true).write(true).read(true).open(path)?;
|
||||||
|
let mut writer = Writer::builder().compression_type(CompressionType::Snappy).build(file);
|
||||||
|
let mut builder = Merger::builder(docs_merge);
|
||||||
|
builder.extend(docs_stores);
|
||||||
|
builder.build().write_into(&mut writer)?;
|
||||||
|
let file = writer.into_inner()?;
|
||||||
|
let mmap = unsafe { Mmap::map(&file)? };
|
||||||
|
let documents = Reader::new(&mmap)?;
|
||||||
|
let count = documents.metadata().count_entries;
|
||||||
|
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
debug!("Wrote {} documents into LMDB", count);
|
debug!("Wrote {} documents into LMDB", count);
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use std::fs::File;
|
||||||
use std::io::{self, Write, BufRead};
|
use std::io::{self, Write, BufRead};
|
||||||
use std::iter::once;
|
use std::iter::once;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@ -6,6 +7,7 @@ use std::time::Instant;
|
|||||||
use heed::EnvOpenOptions;
|
use heed::EnvOpenOptions;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use milli::Index;
|
use milli::Index;
|
||||||
|
use oxidized_mtbl::Reader;
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
@ -42,10 +44,17 @@ fn main() -> anyhow::Result<()> {
|
|||||||
.map_size(100 * 1024 * 1024 * 1024) // 100 GB
|
.map_size(100 * 1024 * 1024 * 1024) // 100 GB
|
||||||
.max_readers(10)
|
.max_readers(10)
|
||||||
.max_dbs(10)
|
.max_dbs(10)
|
||||||
.open(opt.database)?;
|
.open(&opt.database)?;
|
||||||
|
|
||||||
|
// Open the LMDB database.
|
||||||
let index = Index::new(&env)?;
|
let index = Index::new(&env)?;
|
||||||
|
|
||||||
|
// Open the documents MTBL database.
|
||||||
|
let path = opt.database.join("documents.mtbl");
|
||||||
|
let file = File::open(path)?;
|
||||||
|
let mmap = unsafe { memmap::Mmap::map(&file)? };
|
||||||
|
let documents = Reader::new(mmap.as_ref())?;
|
||||||
|
|
||||||
let rtxn = env.read_txn()?;
|
let rtxn = env.read_txn()?;
|
||||||
|
|
||||||
let stdin = io::stdin();
|
let stdin = io::stdin();
|
||||||
@ -67,7 +76,6 @@ fn main() -> anyhow::Result<()> {
|
|||||||
let mut stdout = io::stdout();
|
let mut stdout = io::stdout();
|
||||||
stdout.write_all(&headers)?;
|
stdout.write_all(&headers)?;
|
||||||
|
|
||||||
let documents = index.documents(&rtxn)?.unwrap();
|
|
||||||
for id in &documents_ids {
|
for id in &documents_ids {
|
||||||
let id_bytes = id.to_be_bytes();
|
let id_bytes = id.to_be_bytes();
|
||||||
if let Some(content) = documents.clone().get(&id_bytes)? {
|
if let Some(content) = documents.clone().get(&id_bytes)? {
|
||||||
|
@ -4,10 +4,12 @@ use std::fs::File;
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use askama_warp::Template;
|
use askama_warp::Template;
|
||||||
use heed::EnvOpenOptions;
|
use heed::EnvOpenOptions;
|
||||||
|
use oxidized_mtbl::Reader;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use slice_group_by::StrGroupBy;
|
use slice_group_by::StrGroupBy;
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
@ -57,6 +59,21 @@ fn highlight_string(string: &str, words: &HashSet<String>) -> String {
|
|||||||
output
|
output
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO find a better way or move this elsewhere
|
||||||
|
struct TransitiveArc<T>(Arc<T>);
|
||||||
|
|
||||||
|
impl<T: AsRef<[u8]>> AsRef<[u8]> for TransitiveArc<T> {
|
||||||
|
fn as_ref(&self) -> &[u8] {
|
||||||
|
self.0.as_ref().as_ref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Clone for TransitiveArc<T> {
|
||||||
|
fn clone(&self) -> TransitiveArc<T> {
|
||||||
|
TransitiveArc(self.0.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Template)]
|
#[derive(Template)]
|
||||||
#[template(path = "index.html")]
|
#[template(path = "index.html")]
|
||||||
struct IndexTemplate {
|
struct IndexTemplate {
|
||||||
@ -81,13 +98,23 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.max_dbs(10)
|
.max_dbs(10)
|
||||||
.open(&opt.database)?;
|
.open(&opt.database)?;
|
||||||
|
|
||||||
|
// Open the LMDB database.
|
||||||
let index = Index::new(&env)?;
|
let index = Index::new(&env)?;
|
||||||
|
|
||||||
|
// Open the documents MTBL database.
|
||||||
|
let path = opt.database.join("documents.mtbl");
|
||||||
|
let file = File::open(path)?;
|
||||||
|
let mmap = unsafe { memmap::Mmap::map(&file)? };
|
||||||
|
let mmap = TransitiveArc(Arc::new(mmap));
|
||||||
|
let documents = Reader::new(mmap)?;
|
||||||
|
|
||||||
// Retrieve the database the file stem (w/o the extension),
|
// Retrieve the database the file stem (w/o the extension),
|
||||||
// the disk file size and the number of documents in the database.
|
// the disk file size and the number of documents in the database.
|
||||||
let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
|
let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
|
||||||
let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize;
|
let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize;
|
||||||
let docs_count = env.read_txn().and_then(|r| Ok(index.documents(&r).unwrap().unwrap().metadata().count_entries))?;
|
|
||||||
|
// Retrieve the documents count.
|
||||||
|
let docs_count = documents.metadata().count_entries;
|
||||||
|
|
||||||
// We run and wait on the HTTP server
|
// We run and wait on the HTTP server
|
||||||
|
|
||||||
@ -171,6 +198,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let env_cloned = env.clone();
|
let env_cloned = env.clone();
|
||||||
|
let documents_cloned = documents.clone();
|
||||||
let disable_highlighting = opt.disable_highlighting;
|
let disable_highlighting = opt.disable_highlighting;
|
||||||
let query_route = warp::filters::method::post()
|
let query_route = warp::filters::method::post()
|
||||||
.and(warp::path!("query"))
|
.and(warp::path!("query"))
|
||||||
@ -185,11 +213,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
if let Some(headers) = index.headers(&rtxn).unwrap() {
|
if let Some(headers) = index.headers(&rtxn).unwrap() {
|
||||||
// We write the headers
|
// We write the headers
|
||||||
body.extend_from_slice(headers);
|
body.extend_from_slice(headers);
|
||||||
let documents = index.documents(&rtxn).unwrap().unwrap();
|
|
||||||
|
|
||||||
for id in documents_ids {
|
for id in documents_ids {
|
||||||
let id_bytes = id.to_be_bytes();
|
let id_bytes = id.to_be_bytes();
|
||||||
let content = documents.clone().get(&id_bytes).unwrap();
|
let content = documents_cloned.clone().get(&id_bytes).unwrap();
|
||||||
let content = content.expect(&format!("could not find document {}", id));
|
let content = content.expect(&format!("could not find document {}", id));
|
||||||
let content = std::str::from_utf8(content.as_ref()).unwrap();
|
let content = std::str::from_utf8(content.as_ref()).unwrap();
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@ use heed::{PolyDatabase, Database};
|
|||||||
use levenshtein_automata::LevenshteinAutomatonBuilder as LevBuilder;
|
use levenshtein_automata::LevenshteinAutomatonBuilder as LevBuilder;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use oxidized_mtbl::Reader;
|
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use self::best_proximity::BestProximity;
|
use self::best_proximity::BestProximity;
|
||||||
@ -72,13 +71,6 @@ impl Index {
|
|||||||
self.main.get::<_, Str, ByteSlice>(rtxn, "headers")
|
self.main.get::<_, Str, ByteSlice>(rtxn, "headers")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn documents<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<Option<Reader<&'t [u8]>>> {
|
|
||||||
match self.main.get::<_, Str, ByteSlice>(rtxn, "documents")? {
|
|
||||||
Some(bytes) => Ok(Some(Reader::new(bytes)?)),
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<Option<usize>> {
|
pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<Option<usize>> {
|
||||||
match self.headers(rtxn)? {
|
match self.headers(rtxn)? {
|
||||||
Some(headers) => {
|
Some(headers) => {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user