mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-11 05:54:30 +01:00
Extract the geo fields DelAdd and generate a new DelAdd obkv with it
This commit is contained in:
parent
ba90a5ec0e
commit
a3dae4db9b
@ -6,6 +6,7 @@ use serde_json::Value;
|
|||||||
|
|
||||||
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
|
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
|
||||||
use crate::error::GeoError;
|
use crate::error::GeoError;
|
||||||
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||||
use crate::update::index_documents::extract_finite_float_from_value;
|
use crate::update::index_documents::extract_finite_float_from_value;
|
||||||
use crate::{FieldId, InternalError, Result};
|
use crate::{FieldId, InternalError, Result};
|
||||||
|
|
||||||
@ -14,6 +15,7 @@ use crate::{FieldId, InternalError, Result};
|
|||||||
/// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude)
|
/// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude)
|
||||||
#[logging_timer::time]
|
#[logging_timer::time]
|
||||||
pub fn extract_geo_points<R: io::Read + io::Seek>(
|
pub fn extract_geo_points<R: io::Read + io::Seek>(
|
||||||
|
// TODO grenad::Reader<Obkv<FieldId, Obkv<DelAdd, JsonValue>>>
|
||||||
obkv_documents: grenad::Reader<R>,
|
obkv_documents: grenad::Reader<R>,
|
||||||
indexer: GrenadParameters,
|
indexer: GrenadParameters,
|
||||||
primary_key_id: FieldId,
|
primary_key_id: FieldId,
|
||||||
@ -30,39 +32,72 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
|
|||||||
let mut cursor = obkv_documents.into_cursor()?;
|
let mut cursor = obkv_documents.into_cursor()?;
|
||||||
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
||||||
let obkv = obkv::KvReader::new(value);
|
let obkv = obkv::KvReader::new(value);
|
||||||
// since we only needs the primary key when we throw an error we create this getter to
|
// since we only need the primary key when we throw an error
|
||||||
// lazily get it when needed
|
// we create this getter to lazily get it when needed
|
||||||
let document_id = || -> Value {
|
let document_id = || -> Value {
|
||||||
let document_id = obkv.get(primary_key_id).unwrap();
|
let document_id = obkv.get(primary_key_id).unwrap();
|
||||||
serde_json::from_slice(document_id).unwrap()
|
serde_json::from_slice(document_id).unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// HELP we will receive two DelAdds here, one for the lat and one for the lng
|
||||||
|
// what happens if there is a missing Del or Add for one of them?
|
||||||
|
|
||||||
// first we get the two fields
|
// first we get the two fields
|
||||||
let lat = obkv.get(lat_fid);
|
match (obkv.get(lat_fid), obkv.get(lng_fid)) {
|
||||||
let lng = obkv.get(lng_fid);
|
(Some(lat), Some(lng)) => {
|
||||||
|
let deladd_lat_obkv = KvReaderDelAdd::new(lat);
|
||||||
|
let deladd_lng_obkv = KvReaderDelAdd::new(lng);
|
||||||
|
|
||||||
if let Some((lat, lng)) = lat.zip(lng) {
|
// then we extract the values
|
||||||
// then we extract the values
|
let del_lat_lng = deladd_lat_obkv
|
||||||
let lat = extract_finite_float_from_value(
|
.get(DelAdd::Deletion)
|
||||||
serde_json::from_slice(lat).map_err(InternalError::SerdeJson)?,
|
.zip(deladd_lng_obkv.get(DelAdd::Deletion))
|
||||||
)
|
.map(|(lat, lng)| extract_lat_lng(lat, lng, document_id))
|
||||||
.map_err(|lat| GeoError::BadLatitude { document_id: document_id(), value: lat })?;
|
.transpose()?;
|
||||||
|
let add_lat_lng = deladd_lat_obkv
|
||||||
|
.get(DelAdd::Addition)
|
||||||
|
.zip(deladd_lng_obkv.get(DelAdd::Addition))
|
||||||
|
.map(|(lat, lng)| extract_lat_lng(lat, lng, document_id))
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
let lng = extract_finite_float_from_value(
|
let mut obkv = KvWriterDelAdd::memory();
|
||||||
serde_json::from_slice(lng).map_err(InternalError::SerdeJson)?,
|
if let Some([lat, lng]) = del_lat_lng {
|
||||||
)
|
#[allow(clippy::drop_non_drop)]
|
||||||
.map_err(|lng| GeoError::BadLongitude { document_id: document_id(), value: lng })?;
|
let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()];
|
||||||
|
obkv.insert(DelAdd::Deletion, bytes)?;
|
||||||
#[allow(clippy::drop_non_drop)]
|
}
|
||||||
let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()];
|
if let Some([lat, lng]) = add_lat_lng {
|
||||||
writer.insert(docid_bytes, bytes)?;
|
#[allow(clippy::drop_non_drop)]
|
||||||
} else if lat.is_none() && lng.is_some() {
|
let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()];
|
||||||
return Err(GeoError::MissingLatitude { document_id: document_id() })?;
|
obkv.insert(DelAdd::Addition, bytes)?;
|
||||||
} else if lat.is_some() && lng.is_none() {
|
}
|
||||||
return Err(GeoError::MissingLongitude { document_id: document_id() })?;
|
let bytes = obkv.into_inner()?;
|
||||||
|
writer.insert(docid_bytes, bytes)?;
|
||||||
|
}
|
||||||
|
(None, Some(_)) => {
|
||||||
|
return Err(GeoError::MissingLatitude { document_id: document_id() }.into())
|
||||||
|
}
|
||||||
|
(Some(_), None) => {
|
||||||
|
return Err(GeoError::MissingLongitude { document_id: document_id() }.into())
|
||||||
|
}
|
||||||
|
(None, None) => (),
|
||||||
}
|
}
|
||||||
// else => the _geo object was `null`, there is nothing to do
|
|
||||||
}
|
}
|
||||||
|
|
||||||
writer_into_reader(writer)
|
writer_into_reader(writer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extract the finite floats lat and lng from two bytes slices.
|
||||||
|
fn extract_lat_lng(lat: &[u8], lng: &[u8], document_id: impl Fn() -> Value) -> Result<[f64; 2]> {
|
||||||
|
let lat = extract_finite_float_from_value(
|
||||||
|
serde_json::from_slice(lat).map_err(InternalError::SerdeJson)?,
|
||||||
|
)
|
||||||
|
.map_err(|lat| GeoError::BadLatitude { document_id: document_id(), value: lat })?;
|
||||||
|
|
||||||
|
let lng = extract_finite_float_from_value(
|
||||||
|
serde_json::from_slice(lng).map_err(InternalError::SerdeJson)?,
|
||||||
|
)
|
||||||
|
.map_err(|lng| GeoError::BadLongitude { document_id: document_id(), value: lng })?;
|
||||||
|
|
||||||
|
Ok([lat, lng])
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user