mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-04 18:45:46 +01:00
Export grenad operations to file
This commit is contained in:
parent
82b43e9a7f
commit
0f589b9bcd
@ -1,14 +1,17 @@
|
|||||||
use std::collections::{BTreeSet, HashSet};
|
use std::collections::{BTreeSet, HashSet};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{self, BufReader};
|
use std::io::{self, BufReader};
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
use heed::BytesDecode;
|
use heed::BytesDecode;
|
||||||
use obkv::KvReaderU16;
|
use obkv::KvReaderU16;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader,
|
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader,
|
||||||
try_split_array_at, writer_into_reader, GrenadParameters,
|
try_split_array_at, writer_into_reader, GrenadParameters,
|
||||||
};
|
};
|
||||||
|
use super::RawKVWriter;
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::heed_codec::StrBEU16Codec;
|
use crate::heed_codec::StrBEU16Codec;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
@ -16,6 +19,11 @@ use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWri
|
|||||||
use crate::update::MergeFn;
|
use crate::update::MergeFn;
|
||||||
use crate::{DocumentId, FieldId, Result};
|
use crate::{DocumentId, FieldId, Result};
|
||||||
|
|
||||||
|
static WORD_FID_DOCIDS_RAW_KV: Lazy<Mutex<RawKVWriter>> =
|
||||||
|
Lazy::new(|| Mutex::new(RawKVWriter::new("extract_word_fid_docids").unwrap()));
|
||||||
|
static WORD_DOCIDS_RAW_KV: Lazy<Mutex<RawKVWriter>> =
|
||||||
|
Lazy::new(|| Mutex::new(RawKVWriter::new("extract_word_docids").unwrap()));
|
||||||
|
|
||||||
/// Extracts the word and the documents ids where this word appear.
|
/// Extracts the word and the documents ids where this word appear.
|
||||||
///
|
///
|
||||||
/// Returns a grenad reader with the list of extracted words and
|
/// Returns a grenad reader with the list of extracted words and
|
||||||
@ -109,6 +117,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
tempfile::tempfile()?,
|
tempfile::tempfile()?,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut word_docids_raw_kv = WORD_DOCIDS_RAW_KV.lock().unwrap();
|
||||||
let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
|
let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
|
||||||
// TODO: replace sorters by writers by accumulating values into a buffer before inserting them.
|
// TODO: replace sorters by writers by accumulating values into a buffer before inserting them.
|
||||||
while let Some((key, value)) = iter.next()? {
|
while let Some((key, value)) = iter.next()? {
|
||||||
@ -124,10 +133,14 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
|||||||
if exact_attributes.contains(&fid) {
|
if exact_attributes.contains(&fid) {
|
||||||
exact_word_docids_sorter.insert(word.as_bytes(), value)?;
|
exact_word_docids_sorter.insert(word.as_bytes(), value)?;
|
||||||
} else {
|
} else {
|
||||||
|
word_docids_raw_kv.push(word.as_bytes(), value).unwrap();
|
||||||
word_docids_sorter.insert(word.as_bytes(), value)?;
|
word_docids_sorter.insert(word.as_bytes(), value)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
WORD_FID_DOCIDS_RAW_KV.lock().unwrap().flush().unwrap();
|
||||||
|
word_docids_raw_kv.flush().unwrap();
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
sorter_into_reader(word_docids_sorter, indexer)?,
|
sorter_into_reader(word_docids_sorter, indexer)?,
|
||||||
sorter_into_reader(exact_word_docids_sorter, indexer)?,
|
sorter_into_reader(exact_word_docids_sorter, indexer)?,
|
||||||
@ -146,6 +159,8 @@ fn words_into_sorter(
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
puffin::profile_function!();
|
puffin::profile_function!();
|
||||||
|
|
||||||
|
let mut raw_kv_word_fid_docids = WORD_FID_DOCIDS_RAW_KV.lock().unwrap();
|
||||||
|
|
||||||
use itertools::merge_join_by;
|
use itertools::merge_join_by;
|
||||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||||
|
|
||||||
@ -173,7 +188,9 @@ fn words_into_sorter(
|
|||||||
key_buffer.extend_from_slice(word_bytes);
|
key_buffer.extend_from_slice(word_bytes);
|
||||||
key_buffer.push(0);
|
key_buffer.push(0);
|
||||||
key_buffer.extend_from_slice(&fid.to_be_bytes());
|
key_buffer.extend_from_slice(&fid.to_be_bytes());
|
||||||
word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
let value_buffer = value_writer.into_inner().unwrap();
|
||||||
|
raw_kv_word_fid_docids.push(key_buffer, value_buffer).unwrap();
|
||||||
|
word_fid_docids_sorter.insert(&key_buffer, value_buffer)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1,20 +1,26 @@
|
|||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::collections::{BTreeMap, VecDeque};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::BufReader;
|
use std::io::BufReader;
|
||||||
|
use std::sync::Mutex;
|
||||||
use std::{cmp, io};
|
use std::{cmp, io};
|
||||||
|
|
||||||
use obkv::KvReaderU16;
|
use obkv::KvReaderU16;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at,
|
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at,
|
||||||
writer_into_reader, GrenadParameters, MergeFn,
|
writer_into_reader, GrenadParameters, MergeFn,
|
||||||
};
|
};
|
||||||
|
use super::RawKVWriter;
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
use crate::proximity::{index_proximity, MAX_DISTANCE};
|
use crate::proximity::{index_proximity, MAX_DISTANCE};
|
||||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||||
use crate::{DocumentId, Result};
|
use crate::{DocumentId, Result};
|
||||||
|
|
||||||
|
static WORD_PAIR_PROXIMITY_DOCIDS_RAW_KV: Lazy<Mutex<RawKVWriter>> =
|
||||||
|
Lazy::new(|| Mutex::new(RawKVWriter::new("extract_word_pair_proximity_docids").unwrap()));
|
||||||
|
|
||||||
/// Extracts the best proximity between pairs of words and the documents ids where this pair appear.
|
/// Extracts the best proximity between pairs of words and the documents ids where this pair appear.
|
||||||
///
|
///
|
||||||
/// Returns a grenad reader with the list of extracted word pairs proximities and
|
/// Returns a grenad reader with the list of extracted word pairs proximities and
|
||||||
@ -153,6 +159,9 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
|||||||
&mut word_pair_proximity_docids_sorters,
|
&mut word_pair_proximity_docids_sorters,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
WORD_PAIR_PROXIMITY_DOCIDS_RAW_KV.lock().unwrap().flush().unwrap();
|
||||||
|
|
||||||
{
|
{
|
||||||
puffin::profile_scope!("sorter_into_reader");
|
puffin::profile_scope!("sorter_into_reader");
|
||||||
// FIXME: span inside of a hot loop might degrade performance and create big reports
|
// FIXME: span inside of a hot loop might degrade performance and create big reports
|
||||||
@ -186,6 +195,8 @@ fn document_word_positions_into_sorter(
|
|||||||
use itertools::merge_join_by;
|
use itertools::merge_join_by;
|
||||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||||
|
|
||||||
|
let mut word_pair_proximity_docids_raw_kv = WORD_PAIR_PROXIMITY_DOCIDS_RAW_KV.lock().unwrap();
|
||||||
|
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
let mut key_buffer = Vec::new();
|
let mut key_buffer = Vec::new();
|
||||||
for eob in
|
for eob in
|
||||||
@ -217,8 +228,9 @@ fn document_word_positions_into_sorter(
|
|||||||
key_buffer.push(0);
|
key_buffer.push(0);
|
||||||
key_buffer.extend_from_slice(w2.as_bytes());
|
key_buffer.extend_from_slice(w2.as_bytes());
|
||||||
|
|
||||||
word_pair_proximity_docids_sorters[*prox as usize - 1]
|
let value_buffer = value_writer.into_inner().unwrap();
|
||||||
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
word_pair_proximity_docids_raw_kv.push(&key_buffer, value_buffer).unwrap();
|
||||||
|
word_pair_proximity_docids_sorters[*prox as usize - 1].insert(&key_buffer, value_buffer)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1,19 +1,25 @@
|
|||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{self, BufReader};
|
use std::io::{self, BufReader};
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
use obkv::KvReaderU16;
|
use obkv::KvReaderU16;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
|
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
|
||||||
GrenadParameters,
|
GrenadParameters,
|
||||||
};
|
};
|
||||||
|
use super::RawKVWriter;
|
||||||
use crate::error::SerializationError;
|
use crate::error::SerializationError;
|
||||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||||
use crate::update::MergeFn;
|
use crate::update::MergeFn;
|
||||||
use crate::{bucketed_position, DocumentId, Result};
|
use crate::{bucketed_position, DocumentId, Result};
|
||||||
|
|
||||||
|
static WORD_POSITION_DOCIDS_RAW_KV: Lazy<Mutex<RawKVWriter>> =
|
||||||
|
Lazy::new(|| Mutex::new(RawKVWriter::new("extract_word_position_docids").unwrap()));
|
||||||
|
|
||||||
/// Extracts the word positions and the documents ids where this word appear.
|
/// Extracts the word positions and the documents ids where this word appear.
|
||||||
///
|
///
|
||||||
/// Returns a grenad reader with the list of extracted words at positions and
|
/// Returns a grenad reader with the list of extracted words at positions and
|
||||||
@ -88,6 +94,8 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
|||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
WORD_POSITION_DOCIDS_RAW_KV.lock().unwrap().flush().unwrap();
|
||||||
|
|
||||||
// TODO remove noop DelAdd OBKV
|
// TODO remove noop DelAdd OBKV
|
||||||
let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?;
|
let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?;
|
||||||
|
|
||||||
@ -107,6 +115,8 @@ fn words_position_into_sorter(
|
|||||||
use itertools::merge_join_by;
|
use itertools::merge_join_by;
|
||||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||||
|
|
||||||
|
let mut word_position_docids_raw_kv = WORD_POSITION_DOCIDS_RAW_KV.lock().unwrap();
|
||||||
|
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a))
|
for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a))
|
||||||
{
|
{
|
||||||
@ -133,7 +143,10 @@ fn words_position_into_sorter(
|
|||||||
key_buffer.extend_from_slice(word_bytes);
|
key_buffer.extend_from_slice(word_bytes);
|
||||||
key_buffer.push(0);
|
key_buffer.push(0);
|
||||||
key_buffer.extend_from_slice(&position.to_be_bytes());
|
key_buffer.extend_from_slice(&position.to_be_bytes());
|
||||||
word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
|
||||||
|
let value_buffer = value_writer.into_inner().unwrap();
|
||||||
|
word_position_docids_raw_kv.push(key_buffer, value_buffer).unwrap();
|
||||||
|
word_position_docids_sorter.insert(&key_buffer, &value_buffer)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -11,7 +11,7 @@ mod extract_word_position_docids;
|
|||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::BufReader;
|
use std::io::{self, BufReader, Write};
|
||||||
|
|
||||||
use crossbeam_channel::Sender;
|
use crossbeam_channel::Sender;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
@ -38,6 +38,30 @@ use crate::proximity::ProximityPrecision;
|
|||||||
use crate::vector::EmbeddingConfigs;
|
use crate::vector::EmbeddingConfigs;
|
||||||
use crate::{FieldId, FieldsIdsMap, Result};
|
use crate::{FieldId, FieldsIdsMap, Result};
|
||||||
|
|
||||||
|
pub struct RawKVWriter {
|
||||||
|
file: io::BufWriter<File>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RawKVWriter {
|
||||||
|
pub fn new(path: &str) -> io::Result<Self> {
|
||||||
|
Ok(Self { file: File::create(path).map(io::BufWriter::new)? })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
|
||||||
|
let key_len = key.len().to_be_bytes();
|
||||||
|
let value_len = value.len().to_be_bytes();
|
||||||
|
self.file.write_all(&key_len)?;
|
||||||
|
self.file.write_all(key)?;
|
||||||
|
self.file.write_all(&value_len)?;
|
||||||
|
self.file.write_all(value)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flush(&mut self) -> io::Result<()> {
|
||||||
|
self.file.flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Extract data for each databases from obkv documents in parallel.
|
/// Extract data for each databases from obkv documents in parallel.
|
||||||
/// Send data in grenad file over provided Sender.
|
/// Send data in grenad file over provided Sender.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
Loading…
Reference in New Issue
Block a user