Use an LMDB database to store the external documents ids

This commit is contained in:
Clément Renault 2023-10-28 12:56:46 +02:00 committed by Louis Dureuil
parent fdf3f7f627
commit dfab6293c9
No known key found for this signature in database
7 changed files with 79 additions and 141 deletions

View file

@ -1,12 +1,11 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt;
use fst::Streamer;
use heed::types::{OwnedType, Str};
use heed::{Database, RoIter, RoTxn, RwTxn};
use roaring::RoaringBitmap;
use crate::DocumentId;
use crate::{DocumentId, BEU32};
pub enum DocumentOperationKind {
Create,
@ -19,41 +18,31 @@ pub struct DocumentOperation {
pub kind: DocumentOperationKind,
}
pub struct ExternalDocumentsIds<'a>(fst::Map<Cow<'a, [u8]>>);
pub struct ExternalDocumentsIds(Database<Str, OwnedType<BEU32>>);
impl<'a> ExternalDocumentsIds<'a> {
pub fn new(fst: fst::Map<Cow<'a, [u8]>>) -> ExternalDocumentsIds<'a> {
ExternalDocumentsIds(fst)
}
pub fn into_static(self) -> ExternalDocumentsIds<'static> {
ExternalDocumentsIds(self.0.map_data(|c| Cow::Owned(c.into_owned())).unwrap())
impl ExternalDocumentsIds {
pub fn new(db: Database<Str, OwnedType<BEU32>>) -> ExternalDocumentsIds {
ExternalDocumentsIds(db)
}
/// Returns `true` if hard and soft external documents lists are empty.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
pub fn is_empty(&self, rtxn: &RoTxn) -> heed::Result<bool> {
self.0.is_empty(rtxn).map_err(Into::into)
}
pub fn get<A: AsRef<[u8]>>(&self, external_id: A) -> Option<u32> {
let external_id = external_id.as_ref();
self.0.get(external_id).map(|x| x.try_into().unwrap())
pub fn get<A: AsRef<str>>(&self, rtxn: &RoTxn, external_id: A) -> heed::Result<Option<u32>> {
Ok(self.0.get(rtxn, external_id.as_ref())?.map(|x| x.get().try_into().unwrap()))
}
/// An helper function to debug this type, returns an `HashMap` of both,
/// soft and hard fst maps, combined.
pub fn to_hash_map(&self) -> HashMap<String, u32> {
pub fn to_hash_map(&self, rtxn: &RoTxn) -> heed::Result<HashMap<String, u32>> {
let mut map = HashMap::default();
let mut stream = self.0.stream();
while let Some((k, v)) = stream.next() {
let k = String::from_utf8(k.to_vec()).unwrap();
map.insert(k, v.try_into().unwrap());
for result in self.0.iter(rtxn)? {
let (external, internal) = result?;
map.insert(external.to_owned(), internal.get().try_into().unwrap());
}
map
}
pub fn as_bytes(&self) -> &[u8] {
self.0.as_fst().as_bytes()
Ok(map)
}
/// Looks for the internal ids in the passed bitmap, and returns an iterator over the mapping between
@ -65,12 +54,12 @@ impl<'a> ExternalDocumentsIds<'a> {
/// - `Err(remaining_ids)`: if the external ids for some of the requested internal ids weren't found.
/// In that case the returned bitmap contains the internal ids whose external ids were not found after traversing
/// the entire fst.
pub fn find_external_id_of(
pub fn find_external_id_of<'t>(
&self,
rtxn: &'t RoTxn,
internal_ids: RoaringBitmap,
) -> ExternalToInternalOwnedIterator<'_> {
let it = ExternalToInternalOwnedIterator { stream: self.0.stream(), internal_ids };
it
) -> heed::Result<ExternalToInternalOwnedIterator<'t>> {
self.0.iter(rtxn).map(|iter| ExternalToInternalOwnedIterator { iter, internal_ids })
}
/// Applies the list of operations passed as argument, modifying the current external to internal id mapping.
@ -81,84 +70,39 @@ impl<'a> ExternalDocumentsIds<'a> {
///
/// - If attempting to delete a document that doesn't exist
/// - If attempting to create a document that already exists
pub fn apply(&mut self, mut operations: Vec<DocumentOperation>) {
operations.sort_unstable_by(|left, right| left.external_id.cmp(&right.external_id));
operations.dedup_by(|left, right| left.external_id == right.external_id);
let mut builder = fst::MapBuilder::memory();
let mut stream = self.0.stream();
let mut next_stream = stream.next();
let mut operations = operations.iter();
let mut next_operation = operations.next();
loop {
(next_stream, next_operation) = match (next_stream.take(), next_operation.take()) {
(None, None) => break,
(None, Some(DocumentOperation { external_id, internal_id, kind })) => {
if matches!(kind, DocumentOperationKind::Delete) {
pub fn apply(&self, wtxn: &mut RwTxn, operations: Vec<DocumentOperation>) -> heed::Result<()> {
for DocumentOperation { external_id, internal_id, kind } in operations {
match kind {
DocumentOperationKind::Create => {
// TODO should we get before insert to be able to detect bugs?
// if matches!(kind, DocumentOperationKind::Create) {
// panic!("Attempting to create an already-existing document");
// }
self.0.put(wtxn, &external_id, &BEU32::new(internal_id))?;
}
DocumentOperationKind::Delete => {
if !self.0.delete(wtxn, &external_id)? {
panic!("Attempting to delete a non-existing document")
}
builder.insert(external_id, (*internal_id).into()).unwrap();
(None, operations.next())
}
(Some((k, v)), None) => {
builder.insert(k, v).unwrap();
(stream.next(), None)
}
(
current_stream @ Some((left_external_id, left_internal_id)),
current_operation @ Some(DocumentOperation {
external_id: right_external_id,
internal_id: right_internal_id,
kind,
}),
) => match left_external_id.cmp(right_external_id.as_bytes()) {
std::cmp::Ordering::Less => {
builder.insert(left_external_id, left_internal_id).unwrap();
(stream.next(), current_operation)
}
std::cmp::Ordering::Greater => {
builder.insert(right_external_id, (*right_internal_id).into()).unwrap();
(current_stream, operations.next())
}
std::cmp::Ordering::Equal => {
if matches!(kind, DocumentOperationKind::Create) {
panic!("Attempting to create an already-existing document");
}
// we delete the document, so we just advance both iterators to skip in stream
(stream.next(), operations.next())
}
},
}
}
self.0 = builder.into_map().map_data(Cow::Owned).unwrap();
}
}
impl fmt::Debug for ExternalDocumentsIds<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("ExternalDocumentsIds").field(&self.to_hash_map()).finish()
}
}
impl Default for ExternalDocumentsIds<'static> {
fn default() -> Self {
ExternalDocumentsIds(fst::Map::default().map_data(Cow::Owned).unwrap())
Ok(())
}
}
/// An iterator over mappings between requested internal ids and external ids.
///
/// See [`ExternalDocumentsIds::find_external_id_of`] for details.
pub struct ExternalToInternalOwnedIterator<'it> {
stream: fst::map::Stream<'it>,
pub struct ExternalToInternalOwnedIterator<'t> {
iter: RoIter<'t, Str, OwnedType<BEU32>>,
internal_ids: RoaringBitmap,
}
impl<'it> Iterator for ExternalToInternalOwnedIterator<'it> {
impl<'t> Iterator for ExternalToInternalOwnedIterator<'t> {
/// A result indicating if a mapping was found, or if the stream was exhausted without finding all internal ids.
type Item = Result<(String, DocumentId), RoaringBitmap>;
type Item = Result<(&'t str, DocumentId), RoaringBitmap>;
fn next(&mut self) -> Option<Self::Item> {
// if all requested ids were found, we won't find any other, so short-circuit
@ -166,23 +110,28 @@ impl<'it> Iterator for ExternalToInternalOwnedIterator<'it> {
return None;
}
loop {
let Some((external, internal)) = self.stream.next() else {
// we exhausted the stream but we still have some internal ids to find
let remaining_ids = std::mem::take(&mut self.internal_ids);
return Some(Err(remaining_ids));
// note: next calls to `next` will return `None` since we replaced the internal_ids
// with the default empty bitmap
let (external, internal) = match self.iter.next() {
Some(Ok((external, internal))) => (external, internal),
// TODO manage this better, remove panic
Some(Err(e)) => panic!("{}", e),
_ => {
// we exhausted the stream but we still have some internal ids to find
let remaining_ids = std::mem::take(&mut self.internal_ids);
return Some(Err(remaining_ids));
// note: next calls to `next` will return `None` since we replaced the internal_ids
// with the default empty bitmap
}
};
let internal = internal.try_into().unwrap();
let internal = internal.get();
let was_contained = self.internal_ids.remove(internal);
if was_contained {
return Some(Ok((std::str::from_utf8(external).unwrap().to_owned(), internal)));
return Some(Ok((external, internal)));
}
}
}
}
impl<'it> ExternalToInternalOwnedIterator<'it> {
impl<'t> ExternalToInternalOwnedIterator<'t> {
/// Returns the bitmap of internal ids whose external id are yet to be found
pub fn remaining_internal_ids(&self) -> &RoaringBitmap {
&self.internal_ids
@ -191,7 +140,7 @@ impl<'it> ExternalToInternalOwnedIterator<'it> {
/// Consumes this iterator and returns an iterator over only the external ids, ignoring the internal ids.
///
/// Use this when you don't need the mapping between the external and the internal ids.
pub fn only_external_ids(self) -> impl Iterator<Item = Result<String, RoaringBitmap>> + 'it {
self.map(|res| res.map(|(external, _internal)| external))
pub fn only_external_ids(self) -> impl Iterator<Item = Result<String, RoaringBitmap>> + 't {
self.map(|res| res.map(|(external, _internal)| external.to_owned()))
}
}