mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-14 15:27:28 +01:00
Index geo points based on the settings differences
This commit is contained in:
parent
0f78703b85
commit
fc7e817221
milli/src/update
@ -8,6 +8,7 @@ use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
|
||||
use crate::error::GeoError;
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::index_documents::extract_finite_float_from_value;
|
||||
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
|
||||
use crate::{FieldId, InternalError, Result};
|
||||
|
||||
/// Extracts the geographical coordinates contained in each document under the `_geo` field.
|
||||
@ -18,7 +19,7 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
|
||||
obkv_documents: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
primary_key_id: FieldId,
|
||||
(lat_fid, lng_fid): (FieldId, FieldId),
|
||||
settings_diff: &InnerIndexSettingsDiff,
|
||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||
puffin::profile_function!();
|
||||
|
||||
@ -40,47 +41,27 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
|
||||
serde_json::from_slice(document_id).unwrap()
|
||||
};
|
||||
|
||||
// first we get the two fields
|
||||
match (obkv.get(lat_fid), obkv.get(lng_fid)) {
|
||||
(Some(lat), Some(lng)) => {
|
||||
let deladd_lat_obkv = KvReaderDelAdd::new(lat);
|
||||
let deladd_lng_obkv = KvReaderDelAdd::new(lng);
|
||||
// extract old version
|
||||
let del_lat_lng =
|
||||
extract_lat_lng(&obkv, &settings_diff.old, DelAdd::Deletion, &document_id)?;
|
||||
// extract new version
|
||||
let add_lat_lng =
|
||||
extract_lat_lng(&obkv, &settings_diff.new, DelAdd::Addition, &document_id)?;
|
||||
|
||||
// then we extract the values
|
||||
let del_lat_lng = deladd_lat_obkv
|
||||
.get(DelAdd::Deletion)
|
||||
.zip(deladd_lng_obkv.get(DelAdd::Deletion))
|
||||
.map(|(lat, lng)| extract_lat_lng(lat, lng, document_id))
|
||||
.transpose()?;
|
||||
let add_lat_lng = deladd_lat_obkv
|
||||
.get(DelAdd::Addition)
|
||||
.zip(deladd_lng_obkv.get(DelAdd::Addition))
|
||||
.map(|(lat, lng)| extract_lat_lng(lat, lng, document_id))
|
||||
.transpose()?;
|
||||
|
||||
if del_lat_lng != add_lat_lng {
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
if let Some([lat, lng]) = del_lat_lng {
|
||||
#[allow(clippy::drop_non_drop)]
|
||||
let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()];
|
||||
obkv.insert(DelAdd::Deletion, bytes)?;
|
||||
}
|
||||
if let Some([lat, lng]) = add_lat_lng {
|
||||
#[allow(clippy::drop_non_drop)]
|
||||
let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()];
|
||||
obkv.insert(DelAdd::Addition, bytes)?;
|
||||
}
|
||||
let bytes = obkv.into_inner()?;
|
||||
writer.insert(docid_bytes, bytes)?;
|
||||
}
|
||||
if del_lat_lng != add_lat_lng {
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
if let Some([lat, lng]) = del_lat_lng {
|
||||
#[allow(clippy::drop_non_drop)]
|
||||
let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()];
|
||||
obkv.insert(DelAdd::Deletion, bytes)?;
|
||||
}
|
||||
(None, Some(_)) => {
|
||||
return Err(GeoError::MissingLatitude { document_id: document_id() }.into())
|
||||
if let Some([lat, lng]) = add_lat_lng {
|
||||
#[allow(clippy::drop_non_drop)]
|
||||
let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()];
|
||||
obkv.insert(DelAdd::Addition, bytes)?;
|
||||
}
|
||||
(Some(_), None) => {
|
||||
return Err(GeoError::MissingLongitude { document_id: document_id() }.into())
|
||||
}
|
||||
(None, None) => (),
|
||||
let bytes = obkv.into_inner()?;
|
||||
writer.insert(docid_bytes, bytes)?;
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,16 +69,37 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
|
||||
}
|
||||
|
||||
/// Extract the finite floats lat and lng from two bytes slices.
|
||||
fn extract_lat_lng(lat: &[u8], lng: &[u8], document_id: impl Fn() -> Value) -> Result<[f64; 2]> {
|
||||
let lat = extract_finite_float_from_value(
|
||||
serde_json::from_slice(lat).map_err(InternalError::SerdeJson)?,
|
||||
)
|
||||
.map_err(|lat| GeoError::BadLatitude { document_id: document_id(), value: lat })?;
|
||||
fn extract_lat_lng(
|
||||
document: &obkv::KvReader<FieldId>,
|
||||
settings: &InnerIndexSettings,
|
||||
deladd: DelAdd,
|
||||
document_id: impl Fn() -> Value,
|
||||
) -> Result<Option<[f64; 2]>> {
|
||||
match settings.geo_fields_ids {
|
||||
Some((lat_fid, lng_fid)) => {
|
||||
let lat = document.get(lat_fid).map(KvReaderDelAdd::new).and_then(|r| r.get(deladd));
|
||||
let lng = document.get(lng_fid).map(KvReaderDelAdd::new).and_then(|r| r.get(deladd));
|
||||
let (lat, lng) = match (lat, lng) {
|
||||
(Some(lat), Some(lng)) => (lat, lng),
|
||||
(Some(lat), None) => {
|
||||
return Err(GeoError::MissingLatitude { document_id: document_id() }.into())
|
||||
}
|
||||
(None, Some(lng)) => {
|
||||
return Err(GeoError::MissingLongitude { document_id: document_id() }.into())
|
||||
}
|
||||
(None, None) => return Ok(None),
|
||||
};
|
||||
let lat = extract_finite_float_from_value(
|
||||
serde_json::from_slice(lat).map_err(InternalError::SerdeJson)?,
|
||||
)
|
||||
.map_err(|lat| GeoError::BadLatitude { document_id: document_id(), value: lat })?;
|
||||
|
||||
let lng = extract_finite_float_from_value(
|
||||
serde_json::from_slice(lng).map_err(InternalError::SerdeJson)?,
|
||||
)
|
||||
.map_err(|lng| GeoError::BadLongitude { document_id: document_id(), value: lng })?;
|
||||
|
||||
Ok([lat, lng])
|
||||
let lng = extract_finite_float_from_value(
|
||||
serde_json::from_slice(lng).map_err(InternalError::SerdeJson)?,
|
||||
)
|
||||
.map_err(|lng| GeoError::BadLongitude { document_id: document_id(), value: lng })?;
|
||||
Ok(Some([lat, lng]))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,6 @@ pub(crate) fn data_from_obkv_documents(
|
||||
indexer: GrenadParameters,
|
||||
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
||||
primary_key_id: FieldId,
|
||||
geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
settings_diff: Arc<InnerIndexSettingsDiff>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
) -> Result<()> {
|
||||
@ -72,7 +71,6 @@ pub(crate) fn data_from_obkv_documents(
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
primary_key_id,
|
||||
geo_fields_ids,
|
||||
settings_diff.clone(),
|
||||
max_positions_per_attributes,
|
||||
)
|
||||
@ -300,7 +298,6 @@ fn send_and_extract_flattened_documents_data(
|
||||
indexer: GrenadParameters,
|
||||
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
||||
primary_key_id: FieldId,
|
||||
geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
settings_diff: Arc<InnerIndexSettingsDiff>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
) -> Result<(
|
||||
@ -310,12 +307,12 @@ fn send_and_extract_flattened_documents_data(
|
||||
let flattened_documents_chunk =
|
||||
flattened_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
|
||||
|
||||
if let Some(geo_fields_ids) = geo_fields_ids {
|
||||
if settings_diff.run_geo_indexing() {
|
||||
let documents_chunk_cloned = flattened_documents_chunk.clone();
|
||||
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
||||
rayon::spawn(move || {
|
||||
let result =
|
||||
extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, geo_fields_ids);
|
||||
extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, &settings_diff);
|
||||
let _ = match result {
|
||||
Ok(geo_points) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoPoints(geo_points))),
|
||||
Err(error) => lmdb_writer_sx_cloned.send(Err(error)),
|
||||
|
@ -327,25 +327,6 @@ where
|
||||
// get the fid of the `_geo.lat` and `_geo.lng` fields.
|
||||
let mut field_id_map = self.index.fields_ids_map(self.wtxn)?;
|
||||
|
||||
// self.index.fields_ids_map($a)? ==>> field_id_map
|
||||
let geo_fields_ids = match field_id_map.id("_geo") {
|
||||
Some(gfid) => {
|
||||
let is_sortable = self.index.sortable_fields_ids(self.wtxn)?.contains(&gfid);
|
||||
let is_filterable = self.index.filterable_fields_ids(self.wtxn)?.contains(&gfid);
|
||||
// if `_geo` is faceted then we get the `lat` and `lng`
|
||||
if is_sortable || is_filterable {
|
||||
let field_ids = field_id_map
|
||||
.insert("_geo.lat")
|
||||
.zip(field_id_map.insert("_geo.lng"))
|
||||
.ok_or(UserError::AttributeLimitReached)?;
|
||||
Some(field_ids)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let pool_params = GrenadParameters {
|
||||
chunk_compression_type: self.indexer_config.chunk_compression_type,
|
||||
chunk_compression_level: self.indexer_config.chunk_compression_level,
|
||||
@ -412,7 +393,6 @@ where
|
||||
pool_params,
|
||||
lmdb_writer_sx.clone(),
|
||||
primary_key_id,
|
||||
geo_fields_ids,
|
||||
settings_diff.clone(),
|
||||
max_positions_per_attributes,
|
||||
)
|
||||
|
@ -1161,6 +1161,11 @@ impl InnerIndexSettingsDiff {
|
||||
pub fn settings_update_only(&self) -> bool {
|
||||
self.settings_update_only
|
||||
}
|
||||
|
||||
pub fn run_geo_indexing(&self) -> bool {
|
||||
self.old.geo_fields_ids != self.new.geo_fields_ids
|
||||
|| (!self.settings_update_only && self.new.geo_fields_ids.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -1177,6 +1182,7 @@ pub(crate) struct InnerIndexSettings {
|
||||
pub proximity_precision: ProximityPrecision,
|
||||
pub embedding_configs: EmbeddingConfigs,
|
||||
pub existing_fields: HashSet<String>,
|
||||
pub geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
}
|
||||
|
||||
impl InnerIndexSettings {
|
||||
@ -1200,6 +1206,24 @@ impl InnerIndexSettings {
|
||||
.into_iter()
|
||||
.filter_map(|(field, count)| (count != 0).then_some(field))
|
||||
.collect();
|
||||
// index.fields_ids_map($a)? ==>> fields_ids_map
|
||||
let geo_fields_ids = match fields_ids_map.id("_geo") {
|
||||
Some(gfid) => {
|
||||
let is_sortable = index.sortable_fields_ids(rtxn)?.contains(&gfid);
|
||||
let is_filterable = index.filterable_fields_ids(rtxn)?.contains(&gfid);
|
||||
// if `_geo` is faceted then we get the `lat` and `lng`
|
||||
if is_sortable || is_filterable {
|
||||
let field_ids = fields_ids_map
|
||||
.insert("_geo.lat")
|
||||
.zip(fields_ids_map.insert("_geo.lng"))
|
||||
.ok_or(UserError::AttributeLimitReached)?;
|
||||
Some(field_ids)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
stop_words,
|
||||
@ -1214,6 +1238,7 @@ impl InnerIndexSettings {
|
||||
proximity_precision,
|
||||
embedding_configs,
|
||||
existing_fields,
|
||||
geo_fields_ids,
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user