WIP geo fields

This commit is contained in:
Clément Renault 2024-09-12 18:01:02 +02:00
parent b2f4e67c9a
commit f7652186e1
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
7 changed files with 152 additions and 71 deletions

View File

@ -323,6 +323,7 @@ pub enum MergerOperation {
WordPositionDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), WordPositionDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>),
DeleteDocument { docid: DocumentId }, DeleteDocument { docid: DocumentId },
InsertDocument { docid: DocumentId, document: Box<KvReaderFieldId> }, InsertDocument { docid: DocumentId, document: Box<KvReaderFieldId> },
FinishedDocument,
} }
pub struct MergerReceiver(Receiver<MergerOperation>); pub struct MergerReceiver(Receiver<MergerOperation>);
@ -339,22 +340,8 @@ impl IntoIterator for MergerReceiver {
pub struct ExtractorSender(Sender<MergerOperation>); pub struct ExtractorSender(Sender<MergerOperation>);
impl ExtractorSender { impl ExtractorSender {
pub fn document_insert( pub fn document_sender(&self) -> DocumentSender<'_> {
&self, DocumentSender(&self.0)
docid: DocumentId,
document: Box<KvReaderFieldId>,
) -> StdResult<(), SendError<()>> {
match self.0.send(MergerOperation::InsertDocument { docid, document }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn document_delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
match self.0.send(MergerOperation::DeleteDocument { docid }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
} }
pub fn send_searchable<D: DatabaseType>( pub fn send_searchable<D: DatabaseType>(
@ -367,3 +354,38 @@ impl ExtractorSender {
} }
} }
} }
pub struct DocumentSender<'a>(&'a Sender<MergerOperation>);
impl DocumentSender<'_> {
pub fn insert(
&self,
docid: DocumentId,
document: Box<KvReaderFieldId>,
) -> StdResult<(), SendError<()>> {
match self.0.send(MergerOperation::InsertDocument { docid, document }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
match self.0.send(MergerOperation::DeleteDocument { docid }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
pub fn finish(self) -> StdResult<(), SendError<()>> {
match self.0.send(MergerOperation::FinishedDocument) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
impl Drop for DocumentSender<'_> {
fn drop(&mut self) {
self.0.send(MergerOperation::FinishedDocument);
}
}

View File

@ -12,20 +12,17 @@ pub enum DocumentChange {
pub struct Deletion { pub struct Deletion {
docid: DocumentId, docid: DocumentId,
external_docid: String, // ? current: Box<KvReaderFieldId>,
current: Box<KvReaderFieldId>, // ?
} }
pub struct Update { pub struct Update {
docid: DocumentId, docid: DocumentId,
external_docid: String, // ? current: Box<KvReaderFieldId>,
current: Box<KvReaderFieldId>, // ?
new: Box<KvReaderFieldId>, new: Box<KvReaderFieldId>,
} }
pub struct Insertion { pub struct Insertion {
docid: DocumentId, docid: DocumentId,
external_docid: String, // ?
new: Box<KvReaderFieldId>, new: Box<KvReaderFieldId>,
} }
@ -40,12 +37,8 @@ impl DocumentChange {
} }
impl Deletion { impl Deletion {
pub fn create( pub fn create(docid: DocumentId, current: Box<KvReaderFieldId>) -> Self {
docid: DocumentId, Self { docid, current }
external_docid: String,
current: Box<KvReaderFieldId>,
) -> Self {
Self { docid, external_docid, current }
} }
pub fn docid(&self) -> DocumentId { pub fn docid(&self) -> DocumentId {
@ -63,8 +56,8 @@ impl Deletion {
} }
impl Insertion { impl Insertion {
pub fn create(docid: DocumentId, external_docid: String, new: Box<KvReaderFieldId>) -> Self { pub fn create(docid: DocumentId, new: Box<KvReaderFieldId>) -> Self {
Insertion { docid, external_docid, new } Insertion { docid, new }
} }
pub fn docid(&self) -> DocumentId { pub fn docid(&self) -> DocumentId {
@ -79,11 +72,10 @@ impl Insertion {
impl Update { impl Update {
pub fn create( pub fn create(
docid: DocumentId, docid: DocumentId,
external_docid: String,
current: Box<KvReaderFieldId>, current: Box<KvReaderFieldId>,
new: Box<KvReaderFieldId>, new: Box<KvReaderFieldId>,
) -> Self { ) -> Self {
Update { docid, external_docid, current, new } Update { docid, current, new }
} }
pub fn docid(&self) -> DocumentId { pub fn docid(&self) -> DocumentId {

View File

@ -4,9 +4,8 @@ use rayon::iter::{ParallelBridge, ParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::DocumentChanges; use super::DocumentChanges;
use crate::documents::PrimaryKey;
use crate::update::new::{Deletion, DocumentChange, ItemsPool}; use crate::update::new::{Deletion, DocumentChange, ItemsPool};
use crate::{FieldsIdsMap, Index, InternalError, Result}; use crate::{FieldsIdsMap, Index, Result};
pub struct DocumentDeletion { pub struct DocumentDeletion {
pub to_delete: RoaringBitmap, pub to_delete: RoaringBitmap,
@ -23,31 +22,19 @@ impl DocumentDeletion {
} }
impl<'p> DocumentChanges<'p> for DocumentDeletion { impl<'p> DocumentChanges<'p> for DocumentDeletion {
type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>); type Parameter = &'p Index;
fn document_changes( fn document_changes(
self, self,
_fields_ids_map: &mut FieldsIdsMap, _fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter, param: Self::Parameter,
) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> { ) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> {
let (index, fields, primary_key) = param; let index = param;
let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from)));
Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| {
items.with(|rtxn| { items.with(|rtxn| {
let current = index.document(rtxn, docid)?; let current = index.document(rtxn, docid)?;
let external_docid = match primary_key.document_id(current, fields)? { Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed())))
Ok(document_id) => Ok(document_id) as Result<_>,
Err(_) => Err(InternalError::DocumentsError(
crate::documents::Error::InvalidDocumentFormat,
)
.into()),
}?;
Ok(DocumentChange::Deletion(Deletion::create(
docid,
external_docid,
current.boxed(),
)))
}) })
})) }))
} }

View File

@ -6,7 +6,6 @@ use heed::types::Bytes;
use heed::RoTxn; use heed::RoTxn;
use memmap2::Mmap; use memmap2::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
use serde_json::from_str;
use IndexDocumentsMethod as Idm; use IndexDocumentsMethod as Idm;
use super::super::document_change::DocumentChange; use super::super::document_change::DocumentChange;
@ -289,20 +288,17 @@ impl MergeChanges for MergeDocumentForReplacement {
let new = writer.into_boxed(); let new = writer.into_boxed();
match current { match current {
Some(current) => { Some(current) => Ok(Some(DocumentChange::Update(Update::create(
let update = Update::create(docid, external_docid, current.boxed(), new); docid,
Ok(Some(DocumentChange::Update(update))) current.boxed(),
} new,
None => { )))),
let insertion = Insertion::create(docid, external_docid, new); None => Ok(Some(DocumentChange::Insertion(Insertion::create(docid, new)))),
Ok(Some(DocumentChange::Insertion(insertion)))
}
} }
} }
Some(InnerDocOp::Deletion) => match current { Some(InnerDocOp::Deletion) => match current {
Some(current) => { Some(current) => {
let deletion = Deletion::create(docid, external_docid, current.boxed()); Ok(Some(DocumentChange::Deletion(Deletion::create(docid, current.boxed()))))
Ok(Some(DocumentChange::Deletion(deletion)))
} }
None => Ok(None), None => Ok(None),
}, },
@ -361,7 +357,7 @@ impl MergeChanges for MergeDocumentForUpdates {
if operations.is_empty() { if operations.is_empty() {
match current { match current {
Some(current) => { Some(current) => {
let deletion = Deletion::create(docid, external_docid, current.boxed()); let deletion = Deletion::create(docid, current.boxed());
return Ok(Some(DocumentChange::Deletion(deletion))); return Ok(Some(DocumentChange::Deletion(deletion)));
} }
None => return Ok(None), None => return Ok(None),
@ -389,11 +385,11 @@ impl MergeChanges for MergeDocumentForUpdates {
match current { match current {
Some(current) => { Some(current) => {
let update = Update::create(docid, external_docid, current.boxed(), new); let update = Update::create(docid, current.boxed(), new);
Ok(Some(DocumentChange::Update(update))) Ok(Some(DocumentChange::Update(update)))
} }
None => { None => {
let insertion = Insertion::create(docid, external_docid, new); let insertion = Insertion::create(docid, new);
Ok(Some(DocumentChange::Insertion(insertion))) Ok(Some(DocumentChange::Insertion(insertion)))
} }
} }

View File

@ -59,6 +59,7 @@ where
let fields_ids_map_lock = RwLock::new(fields_ids_map); let fields_ids_map_lock = RwLock::new(fields_ids_map);
let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock); let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock);
let global_fields_ids_map_clone = global_fields_ids_map.clone();
thread::scope(|s| { thread::scope(|s| {
// TODO manage the errors correctly // TODO manage the errors correctly
@ -70,27 +71,30 @@ where
let document_changes = document_changes.into_par_iter(); let document_changes = document_changes.into_par_iter();
// document but we need to create a function that collects and compresses documents. // document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.document_sender();
document_changes.clone().into_par_iter().try_for_each(|result| { document_changes.clone().into_par_iter().try_for_each(|result| {
match result? { match result? {
DocumentChange::Deletion(deletion) => { DocumentChange::Deletion(deletion) => {
let docid = deletion.docid(); let docid = deletion.docid();
extractor_sender.document_delete(docid).unwrap(); document_sender.delete(docid).unwrap();
} }
DocumentChange::Update(update) => { DocumentChange::Update(update) => {
let docid = update.docid(); let docid = update.docid();
let content = update.new(); let content = update.new();
extractor_sender.document_insert(docid, content.boxed()).unwrap(); document_sender.insert(docid, content.boxed()).unwrap();
} }
DocumentChange::Insertion(insertion) => { DocumentChange::Insertion(insertion) => {
let docid = insertion.docid(); let docid = insertion.docid();
let content = insertion.new(); let content = insertion.new();
extractor_sender.document_insert(docid, content.boxed()).unwrap(); document_sender.insert(docid, content.boxed()).unwrap();
// extracted_dictionary_sender.send(self, dictionary: &[u8]); // extracted_dictionary_sender.send(self, dictionary: &[u8]);
} }
} }
Ok(()) as Result<_> Ok(()) as Result<_>
})?; })?;
document_sender.finish().unwrap();
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let max_memory = TEN_GIB / dbg!(rayon::current_num_threads()); let max_memory = TEN_GIB / dbg!(rayon::current_num_threads());
let grenad_parameters = GrenadParameters { let grenad_parameters = GrenadParameters {
@ -197,7 +201,13 @@ where
tracing::trace_span!(target: "indexing::documents", parent: &current_span, "merge"); tracing::trace_span!(target: "indexing::documents", parent: &current_span, "merge");
let _entered = span.enter(); let _entered = span.enter();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index) merge_grenad_entries(
merger_receiver,
merger_sender,
&rtxn,
index,
global_fields_ids_map_clone,
)
})?; })?;
for operation in writer_receiver { for operation in writer_receiver {

View File

@ -68,7 +68,7 @@ where
} }
}?; }?;
let insertion = Insertion::create(docid, external_docid, document); let insertion = Insertion::create(docid, document);
Ok(DocumentChange::Insertion(insertion)) Ok(DocumentChange::Insertion(insertion))
})) }))
} }

View File

@ -1,20 +1,23 @@
use std::fs::File; use std::fs::File;
use std::io::{self, BufWriter};
use bincode::ErrorKind;
use fst::{Set, SetBuilder}; use fst::{Set, SetBuilder};
use grenad::Merger; use grenad::Merger;
use heed::types::Bytes; use heed::types::Bytes;
use heed::{Database, RoTxn}; use heed::{BoxedError, Database, RoTxn};
use memmap2::Mmap; use memmap2::Mmap;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use std::io::BufWriter;
use tempfile::tempfile; use tempfile::tempfile;
use super::channel::*; use super::channel::*;
use super::KvReaderDelAdd; use super::{Deletion, DocumentChange, Insertion, KvReaderDelAdd, KvReaderFieldId, Update};
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation; use crate::update::new::channel::MergerOperation;
use crate::update::MergeDeladdCboRoaringBitmaps; use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Index, Result}; use crate::{
CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, InternalError, Result,
};
/// TODO We must return some infos/stats /// TODO We must return some infos/stats
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
@ -23,9 +26,11 @@ pub fn merge_grenad_entries(
sender: MergerSender, sender: MergerSender,
rtxn: &RoTxn, rtxn: &RoTxn,
index: &Index, index: &Index,
mut global_fields_ids_map: GlobalFieldsIdsMap<'_>,
) -> Result<()> { ) -> Result<()> {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let mut documents_ids = index.documents_ids(rtxn)?; let mut documents_ids = index.documents_ids(rtxn)?;
let mut geo_extractor = GeoExtractor::new(rtxn, index)?;
for merger_operation in receiver { for merger_operation in receiver {
match merger_operation { match merger_operation {
@ -125,6 +130,18 @@ pub fn merge_grenad_entries(
let _entered = span.enter(); let _entered = span.enter();
documents_ids.insert(docid); documents_ids.insert(docid);
sender.documents().uncompressed(docid, &document).unwrap(); sender.documents().uncompressed(docid, &document).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
let current: Option<&KvReaderFieldId> = current.map(Into::into);
let change = match current {
Some(current) => {
DocumentChange::Update(Update::create(docid, current.boxed(), document))
}
None => DocumentChange::Insertion(Insertion::create(docid, document)),
};
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
}
} }
MergerOperation::DeleteDocument { docid } => { MergerOperation::DeleteDocument { docid } => {
let span = let span =
@ -134,6 +151,15 @@ pub fn merge_grenad_entries(
unreachable!("Tried deleting a document that we do not know about"); unreachable!("Tried deleting a document that we do not know about");
} }
sender.documents().delete(docid).unwrap(); sender.documents().delete(docid).unwrap();
if let Some(geo_extractor) = geo_extractor.as_mut() {
let current = index.document(rtxn, docid)?;
let change = DocumentChange::Deletion(Deletion::create(docid, current.boxed()));
geo_extractor.manage_change(&mut global_fields_ids_map, &change)?;
}
}
MergerOperation::FinishedDocument => {
// send the rtree
} }
} }
} }
@ -153,6 +179,54 @@ pub fn merge_grenad_entries(
Ok(()) Ok(())
} }
pub struct GeoExtractor {
rtree: Option<rstar::RTree<GeoPoint>>,
}
impl GeoExtractor {
pub fn new(rtxn: &RoTxn, index: &Index) -> Result<Option<Self>> {
let is_sortable = index.sortable_fields(rtxn)?.contains("_geo");
let is_filterable = index.filterable_fields(rtxn)?.contains("_geo");
if is_sortable || is_filterable {
Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? }))
} else {
Ok(None)
}
}
pub fn manage_change(
&mut self,
fidmap: &mut GlobalFieldsIdsMap,
change: &DocumentChange,
) -> Result<()> {
match change {
DocumentChange::Deletion(_) => todo!(),
DocumentChange::Update(_) => todo!(),
DocumentChange::Insertion(_) => todo!(),
}
}
pub fn serialize_rtree<W: io::Write>(self, writer: &mut W) -> Result<bool> {
match self.rtree {
Some(rtree) => {
// TODO What should I do?
bincode::serialize_into(writer, &rtree).map(|_| true).map_err(|e| match *e {
ErrorKind::Io(e) => Error::IoError(e),
ErrorKind::InvalidUtf8Encoding(_) => todo!(),
ErrorKind::InvalidBoolEncoding(_) => todo!(),
ErrorKind::InvalidCharEncoding => todo!(),
ErrorKind::InvalidTagEncoding(_) => todo!(),
ErrorKind::DeserializeAnyNotSupported => todo!(),
ErrorKind::SizeLimit => todo!(),
ErrorKind::SequenceMustHaveLength => todo!(),
ErrorKind::Custom(_) => todo!(),
})
}
None => Ok(false),
}
}
}
fn compute_new_words_fst( fn compute_new_words_fst(
add_words_fst: SetBuilder<BufWriter<File>>, add_words_fst: SetBuilder<BufWriter<File>>,
del_words_fst: SetBuilder<BufWriter<File>>, del_words_fst: SetBuilder<BufWriter<File>>,