diff --git a/Cargo.lock b/Cargo.lock index 30b1102b5..fd01352a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3664,6 +3664,7 @@ dependencies = [ "time", "tokenizers", "tracing", + "uell", "ureq", "url", "uuid", @@ -5792,6 +5793,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "uell" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40de5982e28612e20330e77d81f1559b74f66caf3c7fc10b19ada4843f4b4fd7" +dependencies = [ + "bumpalo", +] + [[package]] name = "unescaper" version = "0.1.5" diff --git a/crates/meilisearch/tests/documents/add_documents.rs b/crates/meilisearch/tests/documents/add_documents.rs index 0209a6d57..8c9601e0f 100644 --- a/crates/meilisearch/tests/documents/add_documents.rs +++ b/crates/meilisearch/tests/documents/add_documents.rs @@ -2201,7 +2201,7 @@ async fn add_invalid_geo_and_then_settings() { let index = server.index("test"); index.create(Some("id")).await; - // _geo is not an object + // _geo is not a correct object let documents = json!([ { "id": "11", @@ -2230,7 +2230,7 @@ async fn add_invalid_geo_and_then_settings() { } "###); - let (ret, code) = index.update_settings(json!({"sortableAttributes": ["_geo"]})).await; + let (ret, code) = index.update_settings(json!({ "sortableAttributes": ["_geo"] })).await; snapshot!(code, @"202 Accepted"); let ret = index.wait_task(ret.uid()).await; snapshot!(ret, @r###" diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index 005393411..622292e8a 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -100,6 +100,7 @@ bumpalo = "3.16.0" thread_local = "1.1.8" allocator-api2 = "0.2.18" rustc-hash = "2.0.0" +uell = "0.1.0" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/crates/milli/src/update/index_documents/typed_chunk.rs b/crates/milli/src/update/index_documents/typed_chunk.rs index 2c30220bc..a97569800 100644 --- a/crates/milli/src/update/index_documents/typed_chunk.rs +++ b/crates/milli/src/update/index_documents/typed_chunk.rs @@ -737,7 +737,7 @@ pub(crate) fn write_typed_chunk_into_index( } /// Converts the latitude and longitude back to an xyz GeoPoint. -fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint { +pub fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint { let (lat, tail) = helpers::try_split_array_at::(value).unwrap(); let (lng, _) = helpers::try_split_array_at::(tail).unwrap(); let point = [f64::from_ne_bytes(lat), f64::from_ne_bytes(lng)]; diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 5b91ae77f..2027b4db8 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -3,9 +3,12 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use heed::types::Bytes; +use memmap2::Mmap; +use roaring::RoaringBitmap; use super::extract::FacetKind; use super::StdResult; +use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY}; use crate::index::IndexEmbeddingConfig; use crate::update::new::KvReaderFieldId; use crate::vector::Embedding; @@ -25,9 +28,9 @@ pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver) ) } -pub struct KeyValueEntry { - pub key_length: usize, - pub data: Box<[u8]>, +pub enum KeyValueEntry { + Small { key_length: usize, data: Box<[u8]> }, + Large { key_entry: KeyEntry, data: Mmap }, } impl KeyValueEntry { @@ -35,14 +38,25 @@ impl KeyValueEntry { let mut data = Vec::with_capacity(key.len() + value.len()); data.extend_from_slice(key); data.extend_from_slice(value); - KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() } + KeyValueEntry::Small { key_length: key.len(), data: data.into_boxed_slice() } } + + fn from_large_key_value(key: &[u8], value: Mmap) -> Self { + KeyValueEntry::Large { key_entry: KeyEntry::from_key(key), data: value } + } + pub fn key(&self) -> &[u8] { - &self.data[..self.key_length] + match self { + KeyValueEntry::Small { key_length, data } => &data[..*key_length], + KeyValueEntry::Large { key_entry, data: _ } => key_entry.entry(), + } } pub fn value(&self) -> &[u8] { - &self.data[self.key_length..] + match self { + KeyValueEntry::Small { key_length, data } => &data[*key_length..], + KeyValueEntry::Large { key_entry: _, data } => &data[..], + } } } @@ -97,6 +111,7 @@ pub struct DbOperation { #[derive(Debug)] pub enum Database { + Main, Documents, ExternalDocumentsIds, ExactWordDocids, @@ -115,6 +130,7 @@ pub enum Database { impl Database { pub fn database(&self, index: &Index) -> heed::Database { match self { + Database::Main => index.main.remap_types(), Database::Documents => index.documents.remap_types(), Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(), Database::ExactWordDocids => index.exact_word_docids.remap_types(), @@ -207,6 +223,10 @@ impl ExtractorSender { EmbeddingSender(&self.sender) } + pub fn geo(&self) -> GeoSender<'_> { + GeoSender(&self.sender) + } + fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { match self .sender @@ -423,3 +443,34 @@ impl EmbeddingSender<'_> { .map_err(|_| SendError(())) } } + +pub struct GeoSender<'a>(&'a Sender); + +impl GeoSender<'_> { + pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> { + self.0 + .send(WriterOperation::DbOperation(DbOperation { + database: Database::Main, + entry: EntryOperation::Write(KeyValueEntry::from_large_key_value( + GEO_RTREE_KEY.as_bytes(), + value, + )), + })) + .map_err(|_| SendError(())) + } + + pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> StdResult<(), SendError<()>> { + let mut buffer = Vec::new(); + bitmap.serialize_into(&mut buffer).unwrap(); + + self.0 + .send(WriterOperation::DbOperation(DbOperation { + database: Database::Main, + entry: EntryOperation::Write(KeyValueEntry::from_small_key_value( + GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(), + &buffer, + )), + })) + .map_err(|_| SendError(())) + } +} diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 692277597..8d4e3b0a9 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -352,6 +352,11 @@ where unordered_field_buffer.push((vectors_fid, &vectors_value)); } + if let Some(geo_value) = document.geo_field()? { + let fid = fields_ids_map.id_or_insert("_geo").ok_or(UserError::AttributeLimitReached)?; + unordered_field_buffer.push((fid, geo_value)); + } + unordered_field_buffer.sort_by_key(|(fid, _)| *fid); for (fid, value) in unordered_field_buffer.iter() { writer.insert(*fid, value.get().as_bytes()).unwrap(); @@ -406,6 +411,7 @@ impl<'doc> Versions<'doc> { pub fn is_empty(&self) -> bool { self.data.is_empty() } + pub fn top_level_field(&self, k: &str) -> Option<&'doc RawValue> { if k == RESERVED_VECTORS_FIELD_NAME || k == "_geo" { return None; diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index 2c93a5def..b76fe207a 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -54,7 +54,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); let content = deletion.current( - &context.txn, + &context.rtxn, context.index, &context.db_fields_ids_map, )?; @@ -72,7 +72,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { DocumentChange::Update(update) => { let docid = update.docid(); let content = - update.current(&context.txn, context.index, &context.db_fields_ids_map)?; + update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; for res in content.iter_top_level_fields() { let (f, _) = res?; let entry = document_extractor_data @@ -92,9 +92,9 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { } let content = - update.merged(&context.txn, context.index, &context.db_fields_ids_map)?; + update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?; let vector_content = update.merged_vectors( - &context.txn, + &context.rtxn, context.index, &context.db_fields_ids_map, &context.doc_alloc, diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 11dc8f3c7..d0dc425ae 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -63,7 +63,7 @@ impl FacetedDocidsExtractor { document_change: DocumentChange, ) -> Result<()> { let index = &context.index; - let rtxn = &context.txn; + let rtxn = &context.rtxn; let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let mut cached_sorter = context.data.borrow_mut_or_yield(); match document_change { diff --git a/crates/milli/src/update/new/extract/faceted/facet_document.rs b/crates/milli/src/update/new/extract/faceted/facet_document.rs index cf8984f9c..4308d0aa5 100644 --- a/crates/milli/src/update/new/extract/faceted/facet_document.rs +++ b/crates/milli/src/update/new/extract/faceted/facet_document.rs @@ -10,7 +10,8 @@ pub fn extract_document_facets<'doc>( field_id_map: &mut GlobalFieldsIdsMap, facet_fn: &mut impl FnMut(FieldId, &Value) -> Result<()>, ) -> Result<()> { - for res in document.iter_top_level_fields() { + let geo = document.geo_field().transpose().map(|res| res.map(|rval| ("_geo", rval))); + for res in document.iter_top_level_fields().chain(geo) { let (field_name, value) = res?; let mut tokenize_field = |name: &str, value: &Value| match field_id_map.id_or_insert(name) { diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs new file mode 100644 index 000000000..180611eee --- /dev/null +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -0,0 +1,302 @@ +use std::cell::RefCell; +use std::fs::File; +use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _}; +use std::{iter, mem, result}; + +use bumpalo::Bump; +use bytemuck::{bytes_of, from_bytes, pod_read_unaligned, Pod, Zeroable}; +use heed::RoTxn; +use serde_json::value::RawValue; +use serde_json::Value; + +use crate::error::GeoError; +use crate::update::new::document::Document; +use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend}; +use crate::update::new::ref_cell_ext::RefCellExt as _; +use crate::update::new::DocumentChange; +use crate::update::GrenadParameters; +use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Object, Result}; + +pub struct GeoExtractor { + grenad_parameters: GrenadParameters, +} + +impl GeoExtractor { + pub fn new( + rtxn: &RoTxn, + index: &Index, + grenad_parameters: GrenadParameters, + ) -> Result> { + let is_sortable = index.sortable_fields(rtxn)?.contains("_geo"); + let is_filterable = index.filterable_fields(rtxn)?.contains("_geo"); + if is_sortable || is_filterable { + Ok(Some(GeoExtractor { grenad_parameters })) + } else { + Ok(None) + } + } +} + +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C, packed)] +pub struct ExtractedGeoPoint { + pub docid: DocumentId, + pub lat_lng: [f64; 2], +} + +impl From for GeoPoint { + /// Converts the latitude and longitude back to an xyz GeoPoint. + fn from(value: ExtractedGeoPoint) -> Self { + let [lat, lng] = value.lat_lng; + let point = [lat, lng]; + let xyz_point = lat_lng_to_xyz(&point); + GeoPoint::new(xyz_point, (value.docid, point)) + } +} + +pub struct GeoExtractorData<'extractor> { + /// The set of documents ids that were removed. If a document sees its geo + /// point being updated, we first put it in the deleted and then in the inserted. + removed: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>, + inserted: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>, + /// TODO Do the doc + spilled_removed: Option>, + /// TODO Do the doc + spilled_inserted: Option>, +} + +impl<'extractor> GeoExtractorData<'extractor> { + pub fn freeze(self) -> Result> { + let GeoExtractorData { removed, inserted, spilled_removed, spilled_inserted } = self; + + Ok(FrozenGeoExtractorData { + removed: removed.into_bump_slice(), + inserted: inserted.into_bump_slice(), + spilled_removed: spilled_removed + .map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error())) + .transpose()?, + spilled_inserted: spilled_inserted + .map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error())) + .transpose()?, + }) + } +} + +unsafe impl MostlySend for GeoExtractorData<'_> {} + +pub struct FrozenGeoExtractorData<'extractor> { + pub removed: &'extractor [ExtractedGeoPoint], + pub inserted: &'extractor [ExtractedGeoPoint], + pub spilled_removed: Option>, + pub spilled_inserted: Option>, +} + +impl<'extractor> FrozenGeoExtractorData<'extractor> { + pub fn iter_and_clear_removed( + &mut self, + ) -> impl IntoIterator> + '_ { + mem::take(&mut self.removed) + .iter() + .copied() + .map(Ok) + .chain(iterator_over_spilled_geopoints(&mut self.spilled_removed)) + } + + pub fn iter_and_clear_inserted( + &mut self, + ) -> impl IntoIterator> + '_ { + mem::take(&mut self.inserted) + .iter() + .copied() + .map(Ok) + .chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted)) + } +} + +fn iterator_over_spilled_geopoints( + spilled: &mut Option>, +) -> impl IntoIterator> + '_ { + let mut spilled = spilled.take(); + iter::from_fn(move || match &mut spilled { + Some(file) => { + let geopoint_bytes = &mut [0u8; mem::size_of::()]; + match file.read_exact(geopoint_bytes) { + Ok(()) => Some(Ok(pod_read_unaligned(geopoint_bytes))), + Err(e) if e.kind() == ErrorKind::UnexpectedEof => None, + Err(e) => Some(Err(e)), + } + } + None => None, + }) +} + +impl<'extractor> Extractor<'extractor> for GeoExtractor { + type Data = RefCell>; + + fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result { + Ok(RefCell::new(GeoExtractorData { + removed: bumpalo::collections::Vec::new_in(extractor_alloc), + // inserted: Uell::new_in(extractor_alloc), + inserted: bumpalo::collections::Vec::new_in(extractor_alloc), + spilled_inserted: None, + spilled_removed: None, + })) + } + + fn process<'doc>( + &'doc self, + changes: impl Iterator>>, + context: &'doc DocumentChangeContext, + ) -> Result<()> { + let rtxn = &context.rtxn; + let index = context.index; + let max_memory = self.grenad_parameters.max_memory; + let db_fields_ids_map = context.db_fields_ids_map; + let mut data_ref = context.data.borrow_mut_or_yield(); + + for change in changes { + if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) { + // We must spill as we allocated too much memory + data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?; + data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?; + } + + match change? { + DocumentChange::Deletion(deletion) => { + let docid = deletion.docid(); + let external_id = deletion.external_document_id(); + let current = deletion.current(rtxn, index, db_fields_ids_map)?; + let current_geo = current + .geo_field()? + .map(|geo| extract_geo_coordinates(external_id, geo)) + .transpose()?; + + if let Some(lat_lng) = current_geo.flatten() { + let geopoint = ExtractedGeoPoint { docid, lat_lng }; + match &mut data_ref.spilled_removed { + Some(file) => file.write_all(bytes_of(&geopoint))?, + None => data_ref.removed.push(geopoint), + } + } + } + DocumentChange::Update(update) => { + let current = update.current(rtxn, index, db_fields_ids_map)?; + let external_id = update.external_document_id(); + let docid = update.docid(); + + let current_geo = current + .geo_field()? + .map(|geo| extract_geo_coordinates(external_id, geo)) + .transpose()?; + + let updated_geo = update + .updated() + .geo_field()? + .map(|geo| extract_geo_coordinates(external_id, geo)) + .transpose()?; + + if current_geo != updated_geo { + // If the current and new geo points are different it means that + // we need to replace the current by the new point and therefore + // delete the current point from the RTree. + if let Some(lat_lng) = current_geo.flatten() { + let geopoint = ExtractedGeoPoint { docid, lat_lng }; + match &mut data_ref.spilled_removed { + Some(file) => file.write_all(bytes_of(&geopoint))?, + None => data_ref.removed.push(geopoint), + } + } + + if let Some(lat_lng) = updated_geo.flatten() { + let geopoint = ExtractedGeoPoint { docid, lat_lng }; + match &mut data_ref.spilled_inserted { + Some(file) => file.write_all(bytes_of(&geopoint))?, + None => data_ref.inserted.push(geopoint), + } + } + } + } + DocumentChange::Insertion(insertion) => { + let external_id = insertion.external_document_id(); + let docid = insertion.docid(); + + let inserted_geo = insertion + .inserted() + .geo_field()? + .map(|geo| extract_geo_coordinates(external_id, geo)) + .transpose()?; + + if let Some(lat_lng) = inserted_geo.flatten() { + let geopoint = ExtractedGeoPoint { docid, lat_lng }; + match &mut data_ref.spilled_inserted { + Some(file) => file.write_all(bytes_of(&geopoint))?, + None => data_ref.inserted.push(geopoint), + } + } + } + } + } + + Ok(()) + } +} + +/// Extracts and validate the latitude and latitude from a document geo field. +/// +/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`. +fn extract_geo_coordinates(external_id: &str, raw_value: &RawValue) -> Result> { + let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? { + Value::Null => return Ok(None), + Value::Object(map) => map, + value => { + return Err( + GeoError::NotAnObject { document_id: Value::from(external_id), value }.into() + ) + } + }; + + let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) { + (Some(lat), Some(lng)) => [lat, lng], + (Some(_), None) => { + return Err(GeoError::MissingLatitude { document_id: Value::from(external_id) }.into()) + } + (None, Some(_)) => { + return Err(GeoError::MissingLongitude { document_id: Value::from(external_id) }.into()) + } + (None, None) => { + return Err(GeoError::MissingLatitudeAndLongitude { + document_id: Value::from(external_id), + } + .into()) + } + }; + + let lat = extract_finite_float_from_value(lat) + .map_err(|value| GeoError::BadLatitude { document_id: Value::from(external_id), value })?; + + let lng = extract_finite_float_from_value(lng) + .map_err(|value| GeoError::BadLongitude { document_id: Value::from(external_id), value })?; + + Ok(Some([lat, lng])) +} + +/// Extracts and validate that a serde JSON Value is actually a finite f64. +pub fn extract_finite_float_from_value(value: Value) -> result::Result { + let number = match value { + Value::Number(ref n) => match n.as_f64() { + Some(number) => number, + None => return Err(value), + }, + Value::String(ref s) => match s.parse::() { + Ok(number) => number, + Err(_) => return Err(value), + }, + value => return Err(value), + }; + + if number.is_finite() { + Ok(number) + } else { + Err(value) + } +} diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index af6a29d07..14cfa83cb 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -1,6 +1,7 @@ mod cache; mod documents; mod faceted; +mod geo; mod searchable; mod vectors; @@ -8,6 +9,7 @@ use bumpalo::Bump; pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap}; pub use documents::*; pub use faceted::*; +pub use geo::*; pub use searchable::*; pub use vectors::EmbeddingExtractor; diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 89583bd93..0223895e6 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -326,7 +326,7 @@ impl WordDocidsExtractors { document_change: DocumentChange, ) -> Result<()> { let index = &context.index; - let rtxn = &context.txn; + let rtxn = &context.rtxn; let mut cached_sorter_ref = context.data.borrow_mut_or_yield(); let cached_sorter = cached_sorter_ref.as_mut().unwrap(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index 7f9fff38f..f637cff49 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -39,7 +39,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { let doc_alloc = &context.doc_alloc; let index = context.index; - let rtxn = &context.txn; + let rtxn = &context.rtxn; let mut key_buffer = bumpalo::collections::Vec::new_in(doc_alloc); let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc); diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 3a73ff82f..2fb717c71 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -2,13 +2,13 @@ use std::cell::RefCell; use bumpalo::collections::Vec as BVec; use bumpalo::Bump; -use hashbrown::HashMap; +use hashbrown::{DefaultHashBuilder, HashMap}; use super::cache::DelAddRoaringBitmap; use crate::error::FaultSource; use crate::prompt::Prompt; use crate::update::new::channel::EmbeddingSender; -use crate::update::new::indexer::document_changes::{Extractor, MostlySend}; +use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend}; use crate::update::new::vector_document::VectorDocument; use crate::update::new::DocumentChange; use crate::vector::error::{ @@ -37,7 +37,7 @@ impl<'a> EmbeddingExtractor<'a> { } pub struct EmbeddingExtractorData<'extractor>( - pub HashMap, + pub HashMap, ); unsafe impl MostlySend for EmbeddingExtractorData<'_> {} @@ -52,9 +52,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { fn process<'doc>( &'doc self, changes: impl Iterator>>, - context: &'doc crate::update::new::indexer::document_changes::DocumentChangeContext< - Self::Data, - >, + context: &'doc DocumentChangeContext, ) -> crate::Result<()> { let embedders = self.embedders.inner_as_ref(); let mut unused_vectors_distribution = @@ -63,7 +61,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc); for (embedder_name, (embedder, prompt, _is_quantized)) in embedders { let embedder_id = - context.index.embedder_category_id.get(&context.txn, embedder_name)?.ok_or_else( + context.index.embedder_category_id.get(&context.rtxn, embedder_name)?.ok_or_else( || InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None, @@ -95,7 +93,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { } DocumentChange::Update(update) => { let old_vectors = update.current_vectors( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, &context.doc_alloc, @@ -132,7 +130,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { } else if new_vectors.regenerate { let new_rendered = prompt.render_document( update.current( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, )?, @@ -141,7 +139,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { )?; let old_rendered = prompt.render_document( update.merged( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, )?, @@ -160,7 +158,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { } else if old_vectors.regenerate { let old_rendered = prompt.render_document( update.current( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, )?, @@ -169,7 +167,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { )?; let new_rendered = prompt.render_document( update.merged( - &context.txn, + &context.rtxn, context.index, context.db_fields_ids_map, )?, diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index b9bf79e47..e4b088f31 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -197,7 +197,7 @@ pub struct DocumentChangeContext< /// inside of the DB. pub db_fields_ids_map: &'indexer FieldsIdsMap, /// A transaction providing data from the DB before all indexing operations - pub txn: RoTxn<'indexer>, + pub rtxn: RoTxn<'indexer>, /// Global field id map that is up to date with the current state of the indexing process. /// @@ -255,7 +255,7 @@ impl< let txn = index.read_txn()?; Ok(DocumentChangeContext { index, - txn, + rtxn: txn, db_fields_ids_map, new_fields_ids_map: fields_ids_map, doc_alloc, diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index d7648acd8..e89b04223 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -63,7 +63,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { where 'pl: 'doc, // the payload must survive the process calls { - let current = context.index.document(&context.txn, *docid)?; + let current = context.index.document(&context.rtxn, *docid)?; let external_document_id = self.primary_key.extract_docid_from_db( current, diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 6d1d0eea8..e3b24642e 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -33,6 +33,7 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::proximity::ProximityPrecision; use crate::update::del_add::DelAdd; use crate::update::new::extract::EmbeddingExtractor; +use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::InnerIndexSettings; @@ -57,6 +58,7 @@ mod steps { "extracting words", "extracting word proximity", "extracting embeddings", + "writing geo points", "writing to database", "writing embeddings to database", "waiting for extractors", @@ -93,29 +95,33 @@ mod steps { step(4) } - pub const fn write_db() -> (u16, &'static str) { + pub const fn extract_geo_points() -> (u16, &'static str) { step(5) } - pub const fn write_embedding_db() -> (u16, &'static str) { + pub const fn write_db() -> (u16, &'static str) { step(6) } - pub const fn waiting_extractors() -> (u16, &'static str) { + pub const fn write_embedding_db() -> (u16, &'static str) { step(7) } - pub const fn post_processing_facets() -> (u16, &'static str) { + pub const fn waiting_extractors() -> (u16, &'static str) { step(8) } - pub const fn post_processing_words() -> (u16, &'static str) { + pub const fn post_processing_facets() -> (u16, &'static str) { step(9) } - pub const fn finalizing() -> (u16, &'static str) { + pub const fn post_processing_words() -> (u16, &'static str) { step(10) } + + pub const fn finalizing() -> (u16, &'static str) { + step(11) + } } /// This is the main function of this crate. @@ -144,11 +150,8 @@ where let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000); let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; - let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); - let new_fields_ids_map = RwLock::new(new_fields_ids_map); - let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); @@ -328,7 +331,15 @@ where let (finished_steps, step_name) = steps::extract_word_proximity(); - let caches = ::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?; + let caches = ::run_extraction(grenad_parameters, + document_changes, + indexing_context, + &mut extractor_allocs, + finished_steps, + total_steps, + step_name, + )?; + merge_and_send_docids( caches, index.word_pair_proximity_docids.remap_types(), @@ -351,8 +362,6 @@ where let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads()); let (finished_steps, step_name) = steps::extract_embeddings(); - - extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; for config in &mut index_embeddings { @@ -366,6 +375,35 @@ where embedding_sender.finish(index_embeddings).unwrap(); } + 'geo: { + let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); + let _entered = span.enter(); + + // let geo_sender = extractor_sender.geo_points(); + let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { + break 'geo; + }; + let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); + let (finished_steps, step_name) = steps::extract_geo_points(); + extract(document_changes, + &extractor, + indexing_context, + &mut extractor_allocs, + &datastore, + finished_steps, + total_steps, + step_name, + )?; + + merge_and_send_rtree( + datastore, + &rtxn, + index, + extractor_sender.geo(), + &indexing_context.must_stop_processing, + )?; + } + // TODO THIS IS TOO MUCH // - [ ] Extract fieldid docid facet number // - [ ] Extract fieldid docid facet string diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index eb7252445..f6df3981d 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -93,7 +93,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { let DocumentChangeContext { index, db_fields_ids_map, - txn, + rtxn: txn, new_fields_ids_map, doc_alloc, .. diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 4eca113ea..c81f84f43 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -1,68 +1,63 @@ -use std::io::{self}; +use std::cell::RefCell; +use std::io; -use bincode::ErrorKind; use hashbrown::HashSet; use heed::types::Bytes; use heed::{Database, RoTxn}; +use memmap2::Mmap; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; use super::channel::*; use super::extract::{ merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind, + GeoExtractorData, }; -use super::DocumentChange; -use crate::{ - CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, InternalError, - Result, -}; +use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; -pub struct GeoExtractor { - rtree: Option>, -} +#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] +pub fn merge_and_send_rtree<'extractor, MSP>( + datastore: impl IntoIterator>>, + rtxn: &RoTxn, + index: &Index, + geo_sender: GeoSender<'_>, + must_stop_processing: &MSP, +) -> Result<()> +where + MSP: Fn() -> bool + Sync, +{ + let mut rtree = index.geo_rtree(rtxn)?.unwrap_or_default(); + let mut faceted = index.geo_faceted_documents_ids(rtxn)?; -impl GeoExtractor { - pub fn new(rtxn: &RoTxn, index: &Index) -> Result> { - let is_sortable = index.sortable_fields(rtxn)?.contains("_geo"); - let is_filterable = index.filterable_fields(rtxn)?.contains("_geo"); - if is_sortable || is_filterable { - Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? })) - } else { - Ok(None) + for data in datastore { + if must_stop_processing() { + return Err(InternalError::AbortedIndexation.into()); + } + + let mut frozen = data.into_inner().freeze()?; + for result in frozen.iter_and_clear_removed() { + let extracted_geo_point = result?; + debug_assert!(rtree.remove(&GeoPoint::from(extracted_geo_point)).is_some()); + debug_assert!(faceted.remove(extracted_geo_point.docid)); + } + + for result in frozen.iter_and_clear_inserted() { + let extracted_geo_point = result?; + rtree.insert(GeoPoint::from(extracted_geo_point)); + debug_assert!(faceted.insert(extracted_geo_point.docid)); } } - pub fn manage_change( - &mut self, - fidmap: &mut GlobalFieldsIdsMap, - change: &DocumentChange, - ) -> Result<()> { - match change { - DocumentChange::Deletion(_) => todo!(), - DocumentChange::Update(_) => todo!(), - DocumentChange::Insertion(_) => todo!(), - } - } + let mut file = tempfile::tempfile()?; + /// manage error + bincode::serialize_into(&mut file, dbg!(&rtree)).unwrap(); + file.sync_all()?; - pub fn serialize_rtree(self, writer: &mut W) -> Result { - match self.rtree { - Some(rtree) => { - // TODO What should I do? - bincode::serialize_into(writer, &rtree).map(|_| true).map_err(|e| match *e { - ErrorKind::Io(e) => Error::IoError(e), - ErrorKind::InvalidUtf8Encoding(_) => todo!(), - ErrorKind::InvalidBoolEncoding(_) => todo!(), - ErrorKind::InvalidCharEncoding => todo!(), - ErrorKind::InvalidTagEncoding(_) => todo!(), - ErrorKind::DeserializeAnyNotSupported => todo!(), - ErrorKind::SizeLimit => todo!(), - ErrorKind::SequenceMustHaveLength => todo!(), - ErrorKind::Custom(_) => todo!(), - }) - } - None => Ok(false), - } - } + let rtree_mmap = unsafe { Mmap::map(&file)? }; + geo_sender.set_rtree(rtree_mmap).unwrap(); + geo_sender.set_geo_faceted(&faceted).unwrap(); + + Ok(()) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]