Send the geo rtree through crossbeam channel

This commit is contained in:
Clément Renault 2024-11-27 18:03:45 +01:00
parent da650f834e
commit 5c488e20cc
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -166,7 +166,6 @@ pub struct DbOperation {
impl DbOperation { impl DbOperation {
pub fn key_value<'a>(&self, frame: &'a FrameGrantR<'_>) -> (&'a [u8], Option<&'a [u8]>) { pub fn key_value<'a>(&self, frame: &'a FrameGrantR<'_>) -> (&'a [u8], Option<&'a [u8]>) {
/// TODO replace the return type by an enum Write | Delete
let skip = EntryHeader::variant_size() + mem::size_of::<Self>(); let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
match self.key_length { match self.key_length {
Some(key_length) => { Some(key_length) => {
@ -478,8 +477,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
fn write_key_value(&self, database: Database, key: &[u8], value: &[u8]) -> crate::Result<()> { fn write_key_value(&self, database: Database, key: &[u8], value: &[u8]) -> crate::Result<()> {
let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap(); let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
self.write_key_value_with(database, key_length, value.len(), |buffer| { self.write_key_value_with(database, key_length, value.len(), |key_buffer, value_buffer| {
let (key_buffer, value_buffer) = buffer.split_at_mut(key.len());
key_buffer.copy_from_slice(key); key_buffer.copy_from_slice(key);
value_buffer.copy_from_slice(value); value_buffer.copy_from_slice(value);
Ok(()) Ok(())
@ -494,7 +492,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
key_value_writer: F, key_value_writer: F,
) -> crate::Result<()> ) -> crate::Result<()>
where where
F: FnOnce(&mut [u8]) -> crate::Result<()>, F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>,
{ {
let capacity = self.capacity; let capacity = self.capacity;
let refcell = self.producers.get().unwrap(); let refcell = self.producers.get().unwrap();
@ -519,7 +517,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
let header_size = payload_header.header_size(); let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size); let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes); payload_header.serialize_into(header_bytes);
key_value_writer(remaining)?; let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer)?;
// We could commit only the used memory. // We could commit only the used memory.
grant.commit(total_length); grant.commit(total_length);
@ -635,12 +634,16 @@ impl<D: DatabaseType> WordDocidsSender<'_, '_, D> {
pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> { pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> {
let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap(); let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
let value_length = CboRoaringBitmapCodec::serialized_size(bitmap); let value_length = CboRoaringBitmapCodec::serialized_size(bitmap);
self.sender.write_key_value_with(D::DATABASE, key_length, value_length, |buffer| { self.sender.write_key_value_with(
let (key_buffer, value_buffer) = buffer.split_at_mut(key.len()); D::DATABASE,
key_buffer.copy_from_slice(key); key_length,
CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_buffer)?; value_length,
Ok(()) |key_buffer, value_buffer| {
}) key_buffer.copy_from_slice(key);
CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_buffer)?;
Ok(())
},
)
} }
pub fn delete(&self, key: &[u8]) -> crate::Result<()> { pub fn delete(&self, key: &[u8]) -> crate::Result<()> {
@ -667,25 +670,29 @@ impl FacetDocidsSender<'_, '_> {
FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_length, FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_length,
}; };
self.sender.write_key_value_with(database, key_length, value_length, |buffer| { self.sender.write_key_value_with(
let (key_out, value_out) = buffer.split_at_mut(key.len()); database,
key_out.copy_from_slice(key); key_length,
value_length,
|key_out, value_out| {
key_out.copy_from_slice(key);
let value_out = match facet_kind { let value_out = match facet_kind {
// We must take the facet group size into account // We must take the facet group size into account
// when we serialize strings and numbers. // when we serialize strings and numbers.
FacetKind::String | FacetKind::Number => { FacetKind::String | FacetKind::Number => {
let (first, remaining) = value_out.split_first_mut().unwrap(); let (first, remaining) = value_out.split_first_mut().unwrap();
*first = 1; *first = 1;
remaining remaining
} }
FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_out, FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_out,
}; };
CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?; CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?;
Ok(()) Ok(())
}) },
)
} }
pub fn delete(&self, key: &[u8]) -> crate::Result<()> { pub fn delete(&self, key: &[u8]) -> crate::Result<()> {
@ -777,32 +784,30 @@ pub struct GeoSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl GeoSender<'_, '_> { impl GeoSender<'_, '_> {
pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> { pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> {
todo!("set rtree from file") self.0
// self.0 .sender
// .send(WriterOperation::DbOperation(DbOperation { .send(ReceiverAction::LargeEntry {
// database: Database::Main, database: Database::Main,
// entry: EntryOperation::Write(KeyValueEntry::from_large_key_value( key: GEO_RTREE_KEY.to_string().into_bytes().into_boxed_slice(),
// GEO_RTREE_KEY.as_bytes(), value,
// value, })
// )), .map_err(|_| SendError(()))
// }))
// .map_err(|_| SendError(()))
} }
pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> StdResult<(), SendError<()>> { pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> crate::Result<()> {
todo!("serialize directly into bbqueue (as a real roaringbitmap not a cbo)") let key = GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes();
let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
let value_length = bitmap.serialized_size();
// let mut buffer = Vec::new(); self.0.write_key_value_with(
// bitmap.serialize_into(&mut buffer).unwrap(); Database::Main,
key_length,
// self.0 value_length,
// .send(WriterOperation::DbOperation(DbOperation { |key_buffer, value_buffer| {
// database: Database::Main, key_buffer.copy_from_slice(key);
// entry: EntryOperation::Write(KeyValueEntry::from_small_key_value( bitmap.serialize_into(value_buffer)?;
// GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(), Ok(())
// &buffer, },
// )), )
// }))
// .map_err(|_| SendError(()))
} }
} }