Merge branch 'main' into indexer-edition-2024

This commit is contained in:
ManyTheFish 2024-11-06 15:19:18 +01:00
commit 10feeb88f2
1122 changed files with 6265 additions and 5265 deletions

View file

@ -0,0 +1,328 @@
use std::ops::ControlFlow;
use bumpalo::Bump;
use serde::de::{DeserializeSeed, Deserializer as _, Visitor};
use serde_json::value::RawValue;
use crate::documents::{
validate_document_id_str, DocumentIdExtractionError, FieldIdMapper, PrimaryKey,
};
use crate::fields_ids_map::MutFieldIdMapper;
use crate::{FieldId, UserError};
// visits a document to fill the top level fields of the field id map and retrieve the external document id.
pub struct FieldAndDocidExtractor<'p, 'indexer, Mapper: MutFieldIdMapper> {
fields_ids_map: &'p mut Mapper,
primary_key: &'p PrimaryKey<'p>,
indexer: &'indexer Bump,
}
impl<'p, 'indexer, Mapper: MutFieldIdMapper> FieldAndDocidExtractor<'p, 'indexer, Mapper> {
pub fn new(
fields_ids_map: &'p mut Mapper,
primary_key: &'p PrimaryKey<'p>,
indexer: &'indexer Bump,
) -> Self {
Self { fields_ids_map, primary_key, indexer }
}
}
impl<'de, 'p, 'indexer: 'de, Mapper: MutFieldIdMapper> Visitor<'de>
for FieldAndDocidExtractor<'p, 'indexer, Mapper>
{
type Value =
Result<Result<DeOrBumpStr<'de, 'indexer>, DocumentIdExtractionError>, crate::UserError>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a map")
}
fn visit_map<A>(mut self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
let mut docid = None;
while let Some(((level_name, right), (fid, fields_ids_map))) =
map.next_key_seed(ComponentsSeed {
name: self.primary_key.name(),
visitor: MutFieldIdMapVisitor(self.fields_ids_map),
})?
{
let Some(_fid) = fid else {
return Ok(Err(crate::UserError::AttributeLimitReached));
};
self.fields_ids_map = fields_ids_map;
let value: &'de RawValue = map.next_value()?;
match match_component(level_name, right, value, self.indexer, &mut docid) {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(Err(err)) => return Err(serde::de::Error::custom(err)),
ControlFlow::Break(Ok(err)) => return Ok(Ok(Err(err))),
}
}
Ok(Ok(match docid {
Some(docid) => Ok(docid),
None => Err(DocumentIdExtractionError::MissingDocumentId),
}))
}
}
struct NestedPrimaryKeyVisitor<'a, 'bump> {
components: &'a str,
bump: &'bump Bump,
}
impl<'de, 'a, 'bump: 'de> Visitor<'de> for NestedPrimaryKeyVisitor<'a, 'bump> {
type Value = std::result::Result<Option<DeOrBumpStr<'de, 'bump>>, DocumentIdExtractionError>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a map")
}
fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
let mut docid = None;
while let Some(((matched_component, right), _)) = map.next_key_seed(ComponentsSeed {
name: self.components,
visitor: serde::de::IgnoredAny,
})? {
let value: &'de RawValue = map.next_value()?;
match match_component(matched_component, right, value, self.bump, &mut docid) {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(Err(err)) => return Err(serde::de::Error::custom(err)),
ControlFlow::Break(Ok(err)) => return Ok(Err(err)),
}
}
Ok(Ok(docid))
}
}
/// Either a `&'de str` or a `&'bump str`.
pub enum DeOrBumpStr<'de, 'bump: 'de> {
/// Lifetime of the deserializer
De(&'de str),
/// Lifetime of the allocator
Bump(&'bump str),
}
impl<'de, 'bump: 'de> DeOrBumpStr<'de, 'bump> {
/// Returns a `&'bump str`, possibly allocating to extend its lifetime.
pub fn to_bump(&self, bump: &'bump Bump) -> &'bump str {
match self {
DeOrBumpStr::De(de) => bump.alloc_str(de),
DeOrBumpStr::Bump(bump) => bump,
}
}
/// Returns a `&'de str`.
///
/// This function never allocates because `'bump: 'de`.
pub fn to_de(&self) -> &'de str {
match self {
DeOrBumpStr::De(de) => de,
DeOrBumpStr::Bump(bump) => bump,
}
}
}
struct ComponentsSeed<'a, V> {
name: &'a str,
visitor: V,
}
impl<'de, 'a, V: Visitor<'de>> DeserializeSeed<'de> for ComponentsSeed<'a, V> {
type Value = ((&'a str, &'a str), V::Value);
fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
struct ComponentsSeedVisitor<'a, V> {
name: &'a str,
visitor: V,
}
impl<'a, V> ComponentsSeedVisitor<'a, V> {
fn match_str(&self, v: &str) -> (&'a str, &'a str) {
let p = PrimaryKey::Nested { name: self.name };
for (name, right) in p.possible_level_names() {
if name == v {
return (name, right);
}
}
("", self.name)
}
}
impl<'de, 'a, V: Visitor<'de>> Visitor<'de> for ComponentsSeedVisitor<'a, V> {
type Value = ((&'a str, &'a str), V::Value);
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "expecting a string")
}
fn visit_borrowed_str<E>(self, v: &'de str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
let matched = self.match_str(v);
let inner = self.visitor.visit_borrowed_str(v)?;
Ok((matched, inner))
}
fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
let matched = self.match_str(v);
let inner = self.visitor.visit_str(v)?;
Ok((matched, inner))
}
}
deserializer
.deserialize_str(ComponentsSeedVisitor { name: self.name, visitor: self.visitor })
}
}
struct MutFieldIdMapVisitor<'a, Mapper: MutFieldIdMapper>(&'a mut Mapper);
impl<'de, 'a, Mapper: MutFieldIdMapper> Visitor<'de> for MutFieldIdMapVisitor<'a, Mapper> {
type Value = (Option<FieldId>, &'a mut Mapper);
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "expecting a string")
}
fn visit_borrowed_str<E>(self, v: &'de str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok((self.0.insert(v), self.0))
}
fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok((self.0.insert(v), self.0))
}
}
pub struct FieldIdMapVisitor<'a, Mapper: FieldIdMapper>(pub &'a Mapper);
impl<'de, 'a, Mapper: FieldIdMapper> Visitor<'de> for FieldIdMapVisitor<'a, Mapper> {
type Value = Option<FieldId>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "expecting a string")
}
fn visit_borrowed_str<E>(self, v: &'de str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(self.0.id(v))
}
fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(self.0.id(v))
}
}
pub struct DocumentIdVisitor<'indexer>(pub &'indexer Bump);
impl<'de, 'indexer: 'de> Visitor<'de> for DocumentIdVisitor<'indexer> {
type Value = std::result::Result<DeOrBumpStr<'de, 'indexer>, DocumentIdExtractionError>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "an integer or a string")
}
fn visit_borrowed_str<E>(self, v: &'de str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(validate_document_id_str(v)
.ok_or_else(|| {
DocumentIdExtractionError::InvalidDocumentId(UserError::InvalidDocumentId {
document_id: serde_json::Value::String(v.to_owned()),
})
})
.map(DeOrBumpStr::De))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let v = self.0.alloc_str(v);
Ok(match self.visit_borrowed_str(v)? {
Ok(_) => Ok(DeOrBumpStr::Bump(v)),
Err(err) => Err(err),
})
}
fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use std::fmt::Write as _;
let mut out = bumpalo::collections::String::new_in(self.0);
write!(&mut out, "{v}").unwrap();
Ok(Ok(DeOrBumpStr::Bump(out.into_bump_str())))
}
fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use std::fmt::Write as _;
let mut out = bumpalo::collections::String::new_in(self.0);
write!(&mut out, "{v}").unwrap();
Ok(Ok(DeOrBumpStr::Bump(out.into_bump_str())))
}
}
pub fn match_component<'de, 'indexer: 'de>(
first_level_name: &str,
right: &str,
value: &'de RawValue,
bump: &'indexer Bump,
docid: &mut Option<DeOrBumpStr<'de, 'indexer>>,
) -> ControlFlow<Result<DocumentIdExtractionError, serde_json::Error>, ()> {
if first_level_name.is_empty() {
return ControlFlow::Continue(());
}
let value = if right.is_empty() {
match value.deserialize_any(DocumentIdVisitor(bump)).map_err(|_err| {
DocumentIdExtractionError::InvalidDocumentId(UserError::InvalidDocumentId {
document_id: serde_json::to_value(value).unwrap(),
})
}) {
Ok(Ok(value)) => value,
Ok(Err(err)) | Err(err) => return ControlFlow::Break(Ok(err)),
}
} else {
// if right is not empty, recursively extract right components from value
let res = value.deserialize_map(NestedPrimaryKeyVisitor { components: right, bump });
match res {
Ok(Ok(Some(value))) => value,
Ok(Ok(None)) => return ControlFlow::Continue(()),
Ok(Err(err)) => return ControlFlow::Break(Ok(err)),
Err(err) if err.is_data() => return ControlFlow::Continue(()), // we expected the field to be a map, but it was not and that's OK.
Err(err) => return ControlFlow::Break(Err(err)),
}
};
if let Some(_previous_value) = docid.replace(value) {
return ControlFlow::Break(Ok(DocumentIdExtractionError::TooManyDocumentIds(2)));
}
ControlFlow::Continue(())
}

View file

@ -0,0 +1,424 @@
use std::cell::{Cell, Ref, RefCell, RefMut};
use std::sync::{Arc, RwLock};
use bumpalo::Bump;
use heed::RoTxn;
use rayon::iter::IndexedParallelIterator;
use super::super::document_change::DocumentChange;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result};
pub trait RefCellExt<T: ?Sized> {
fn try_borrow_or_yield(&self) -> std::result::Result<Ref<'_, T>, std::cell::BorrowError>;
fn try_borrow_mut_or_yield(
&self,
) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError>;
fn borrow_or_yield(&self) -> Ref<'_, T> {
self.try_borrow_or_yield().unwrap()
}
fn borrow_mut_or_yield(&self) -> RefMut<'_, T> {
self.try_borrow_mut_or_yield().unwrap()
}
}
impl<T: ?Sized> RefCellExt<T> for RefCell<T> {
fn try_borrow_or_yield(&self) -> std::result::Result<Ref<'_, T>, std::cell::BorrowError> {
/// TODO: move this trait and impl elsewhere
loop {
match self.try_borrow() {
Ok(borrow) => break Ok(borrow),
Err(error) => {
tracing::warn!("dynamic borrow failed, yielding to local tasks");
match rayon::yield_local() {
Some(rayon::Yield::Executed) => continue,
_ => return Err(error),
}
}
}
}
}
fn try_borrow_mut_or_yield(
&self,
) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError> {
loop {
match self.try_borrow_mut() {
Ok(borrow) => break Ok(borrow),
Err(error) => {
tracing::warn!("dynamic borrow failed, yielding to local tasks");
match rayon::yield_local() {
Some(rayon::Yield::Executed) => continue,
_ => return Err(error),
}
}
}
}
}
}
/// A trait for types that are **not** [`Send`] only because they would then allow concurrent access to a type that is not [`Sync`].
///
/// The primary example of such a type is `&T`, with `T: !Sync`.
///
/// In the authors' understanding, a type can be `!Send` for two distinct reasons:
///
/// 1. Because it contains data that *genuinely* cannot be moved between threads, such as thread-local data.
/// 2. Because sending the type would allow concurrent access to a `!Sync` type, which is undefined behavior.
///
/// `MostlySend` exists to be used in bounds where you need a type whose data is **not** *attached* to a thread
/// because you might access it from a different thread, but where you will never access the type **concurrently** from
/// multiple threads.
///
/// Like [`Send`], `MostlySend` assumes properties on types that cannot be verified by the compiler, which is why implementing
/// this trait is unsafe.
///
/// # Safety
///
/// Implementers of this trait promises that the following properties hold on the implementing type:
///
/// 1. Its data can be accessed from any thread and will be the same regardless of the thread accessing it.
/// 2. Any operation that can be performed on the type does not depend on the thread that executes it.
///
/// As these properties are subtle and are not generally tracked by the Rust type system, great care should be taken before
/// implementing `MostlySend` on a type, especially a foreign type.
///
/// - An example of a type that verifies (1) and (2) is [`std::rc::Rc`] (when `T` is `Send` and `Sync`).
/// - An example of a type that doesn't verify (1) is thread-local data.
/// - An example of a type that doesn't verify (2) is [`std::sync::MutexGuard`]: a lot of mutex implementations require that
/// a lock is returned to the operating system on the same thread that initially locked the mutex, failing to uphold this
/// invariant will cause Undefined Behavior
/// (see last § in [the nomicon](https://doc.rust-lang.org/nomicon/send-and-sync.html)).
///
/// It is **always safe** to implement this trait on a type that is `Send`, but no placeholder impl is provided due to limitations in
/// coherency. Use the [`FullySend`] wrapper in this situation.
pub unsafe trait MostlySend {}
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct FullySend<T>(pub T);
// SAFETY: a type **fully** send is always mostly send as well.
unsafe impl<T> MostlySend for FullySend<T> where T: Send {}
unsafe impl<T> MostlySend for RefCell<T> where T: MostlySend {}
unsafe impl<T> MostlySend for Option<T> where T: MostlySend {}
impl<T> FullySend<T> {
pub fn into(self) -> T {
self.0
}
}
impl<T> From<T> for FullySend<T> {
fn from(value: T) -> Self {
Self(value)
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct MostlySendWrapper<T>(T);
impl<T: MostlySend> MostlySendWrapper<T> {
/// # Safety
///
/// - (P1) Users of this type will never access the type concurrently from multiple threads without synchronization
unsafe fn new(t: T) -> Self {
Self(t)
}
fn as_ref(&self) -> &T {
&self.0
}
fn as_mut(&mut self) -> &mut T {
&mut self.0
}
fn into_inner(self) -> T {
self.0
}
}
/// # Safety
///
/// 1. `T` is [`MostlySend`], so by its safety contract it can be accessed by any thread and all of its operations are available
/// from any thread.
/// 2. (P1) of `MostlySendWrapper::new` forces the user to never access the value from multiple threads concurrently.
unsafe impl<T: MostlySend> Send for MostlySendWrapper<T> {}
/// A wrapper around [`thread_local::ThreadLocal`] that accepts [`MostlySend`] `T`s.
#[derive(Default)]
pub struct ThreadLocal<T: MostlySend> {
inner: thread_local::ThreadLocal<MostlySendWrapper<T>>,
// FIXME: this should be necessary
//_no_send: PhantomData<*mut ()>,
}
impl<T: MostlySend> ThreadLocal<T> {
pub fn new() -> Self {
Self { inner: thread_local::ThreadLocal::new() }
}
pub fn with_capacity(capacity: usize) -> Self {
Self { inner: thread_local::ThreadLocal::with_capacity(capacity) }
}
pub fn clear(&mut self) {
self.inner.clear()
}
pub fn get(&self) -> Option<&T> {
self.inner.get().map(|t| t.as_ref())
}
pub fn get_or<F>(&self, create: F) -> &T
where
F: FnOnce() -> T,
{
/// TODO: move ThreadLocal, MostlySend, FullySend to a dedicated file
self.inner.get_or(|| unsafe { MostlySendWrapper::new(create()) }).as_ref()
}
pub fn get_or_try<F, E>(&self, create: F) -> std::result::Result<&T, E>
where
F: FnOnce() -> std::result::Result<T, E>,
{
self.inner
.get_or_try(|| unsafe { Ok(MostlySendWrapper::new(create()?)) })
.map(MostlySendWrapper::as_ref)
}
pub fn get_or_default(&self) -> &T
where
T: Default,
{
self.inner.get_or_default().as_ref()
}
pub fn iter_mut(&mut self) -> IterMut<T> {
IterMut(self.inner.iter_mut())
}
}
impl<T: MostlySend> IntoIterator for ThreadLocal<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
IntoIter(self.inner.into_iter())
}
}
pub struct IterMut<'a, T: MostlySend>(thread_local::IterMut<'a, MostlySendWrapper<T>>);
impl<'a, T: MostlySend> Iterator for IterMut<'a, T> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|t| t.as_mut())
}
}
pub struct IntoIter<T: MostlySend>(thread_local::IntoIter<MostlySendWrapper<T>>);
impl<T: MostlySend> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|t| t.into_inner())
}
}
pub struct DocumentChangeContext<
'doc, // covariant lifetime of a single `process` call
'extractor: 'doc, // invariant lifetime of the extractor_allocs
'fid: 'doc, // invariant lifetime of the new_fields_ids_map
'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call
T: MostlySend,
> {
/// The index we're indexing in
pub index: &'indexer Index,
/// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents
/// inside of the DB.
pub db_fields_ids_map: &'indexer FieldsIdsMap,
/// A transaction providing data from the DB before all indexing operations
pub txn: RoTxn<'indexer>,
/// Global field id map that is up to date with the current state of the indexing process.
///
/// - Inserting a field will take a lock
/// - Retrieving a field may take a lock as well
pub new_fields_ids_map: &'doc std::cell::RefCell<GlobalFieldsIdsMap<'fid>>,
/// Data allocated in this allocator is cleared between each call to `process`.
pub doc_alloc: Bump,
/// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills.
pub extractor_alloc: &'extractor Bump,
/// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
/// Extractor-specific data
pub data: &'doc T,
}
impl<
'doc, // covariant lifetime of a single `process` call
'data: 'doc, // invariant on T lifetime of the datastore
'extractor: 'doc, // invariant lifetime of extractor_allocs
'fid: 'doc, // invariant lifetime of fields ids map
'indexer: 'doc, // covariant lifetime of objects that survive a `process` call
T: MostlySend,
> DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, T>
{
#[allow(clippy::too_many_arguments)]
pub fn new<F>(
index: &'indexer Index,
db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: &'fid RwLock<FieldsIdsMap>,
extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
datastore: &'data ThreadLocal<T>,
fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
init_data: F,
) -> Result<Self>
where
F: FnOnce(&'extractor Bump) -> Result<T>,
{
let doc_alloc =
doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024 * 1024))));
let doc_alloc = doc_alloc.0.take();
let fields_ids_map = fields_ids_map_store
.get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into());
let fields_ids_map = &fields_ids_map.0;
let extractor_alloc = extractor_allocs.get_or_default();
let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
let txn = index.read_txn()?;
Ok(DocumentChangeContext {
index,
txn,
db_fields_ids_map,
new_fields_ids_map: fields_ids_map,
doc_alloc,
extractor_alloc: &extractor_alloc.0,
data,
doc_allocs,
})
}
}
/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s
pub trait Extractor<'extractor>: Sync {
type Data: MostlySend;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data>;
fn process<'doc>(
&'doc self,
change: DocumentChange<'doc>,
context: &'doc DocumentChangeContext<Self::Data>,
) -> Result<()>;
}
pub trait DocumentChanges<'pl // lifetime of the underlying payload
>: Sync {
type Item: Send;
fn iter(&self) -> impl IndexedParallelIterator<Item = Self::Item>;
fn item_to_document_change<'doc, // lifetime of a single `process` call
T: MostlySend>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
item: Self::Item,
) -> Result<Option<DocumentChange<'doc>>> where 'pl: 'doc // the payload must survive the process calls
;
}
#[derive(Clone, Copy)]
pub struct IndexingContext<
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation
'index, // covariant lifetime of the index
> {
pub index: &'index Index,
pub db_fields_ids_map: &'indexer FieldsIdsMap,
pub new_fields_ids_map: &'fid RwLock<FieldsIdsMap>,
pub doc_allocs: &'indexer ThreadLocal<FullySend<Cell<Bump>>>,
pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
}
pub fn for_each_document_change<
'pl, // covariant lifetime of the underlying payload
'extractor, // invariant lifetime of extractor_alloc
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing
'data, // invariant on EX::Data lifetime of datastore
'index, // covariant lifetime of the index
EX,
DC: DocumentChanges<'pl>,
>(
document_changes: &DC,
extractor: &EX,
IndexingContext {
index,
db_fields_ids_map,
new_fields_ids_map,
doc_allocs,
fields_ids_map_store,
}: IndexingContext<'fid, 'indexer, 'index>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
datastore: &'data ThreadLocal<EX::Data>,
) -> Result<()>
where
EX: Extractor<'extractor>,
{
eprintln!("We are resetting the extractor allocators");
// Clean up and reuse the extractor allocs
for extractor_alloc in extractor_allocs.iter_mut() {
eprintln!("\tWith {} bytes resetted", extractor_alloc.0.allocated_bytes());
extractor_alloc.0.reset();
}
let pi = document_changes.iter();
pi.try_arc_for_each_try_init(
|| {
DocumentChangeContext::new(
index,
db_fields_ids_map,
new_fields_ids_map,
extractor_allocs,
doc_allocs,
datastore,
fields_ids_map_store,
move |index_alloc| extractor.init_data(index_alloc),
)
},
|context, item| {
// Clean up and reuse the document-specific allocator
context.doc_alloc.reset();
let Some(change) =
document_changes.item_to_document_change(context, item).map_err(Arc::new)?
else {
return Ok(());
};
let res = extractor.process(change, context).map_err(Arc::new);
// send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
res
},
)
}

View file

@ -0,0 +1,172 @@
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
use roaring::RoaringBitmap;
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
use crate::documents::PrimaryKey;
use crate::update::new::{Deletion, DocumentChange};
use crate::{DocumentId, Result};
#[derive(Default)]
pub struct DocumentDeletion {
pub to_delete: RoaringBitmap,
}
impl DocumentDeletion {
pub fn new() -> Self {
Self { to_delete: Default::default() }
}
pub fn delete_documents_by_docids(&mut self, docids: RoaringBitmap) {
self.to_delete |= docids;
}
pub fn into_changes<'indexer>(
self,
indexer: &'indexer Bump,
primary_key: PrimaryKey<'indexer>,
) -> DocumentDeletionChanges<'indexer> {
let to_delete: bumpalo::collections::Vec<_> =
self.to_delete.into_iter().collect_in(indexer);
let to_delete = to_delete.into_bump_slice();
DocumentDeletionChanges { to_delete, primary_key }
}
}
pub struct DocumentDeletionChanges<'indexer> {
to_delete: &'indexer [DocumentId],
primary_key: PrimaryKey<'indexer>,
}
impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
type Item = DocumentId;
fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator<Item = Self::Item> {
self.to_delete.into_par_iter().copied()
}
fn item_to_document_change<
'doc, // lifetime of a single `process` call
T: MostlySend,
>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
docid: Self::Item,
) -> Result<Option<DocumentChange<'doc>>>
where
'pl: 'doc, // the payload must survive the process calls
{
let current = context.index.document(&context.txn, docid)?;
let external_document_id = self.primary_key.extract_docid_from_db(
current,
&context.db_fields_ids_map,
&context.doc_alloc,
)?;
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
Ok(Some(DocumentChange::Deletion(Deletion::create(docid, external_document_id))))
}
}
#[cfg(test)]
mod test {
use std::cell::RefCell;
use std::marker::PhantomData;
use std::sync::RwLock;
use bumpalo::Bump;
use crate::index::tests::TempIndex;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, Extractor, IndexingContext, MostlySend,
ThreadLocal,
};
use crate::update::new::indexer::DocumentDeletion;
use crate::update::new::DocumentChange;
use crate::DocumentId;
#[test]
fn test_deletions() {
struct DeletionWithData<'extractor> {
deleted: RefCell<
hashbrown::HashSet<DocumentId, hashbrown::DefaultHashBuilder, &'extractor Bump>,
>,
}
unsafe impl<'extractor> MostlySend for DeletionWithData<'extractor> {}
struct TrackDeletion<'extractor>(PhantomData<&'extractor ()>);
impl<'extractor> Extractor<'extractor> for TrackDeletion<'extractor> {
type Data = DeletionWithData<'extractor>;
fn init_data(&self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> {
let deleted = RefCell::new(hashbrown::HashSet::new_in(extractor_alloc));
Ok(DeletionWithData { deleted })
}
fn process(
&self,
change: DocumentChange,
context: &DocumentChangeContext<Self::Data>,
) -> crate::Result<()> {
context.data.deleted.borrow_mut().insert(change.docid());
Ok(())
}
}
let mut deletions = DocumentDeletion::new();
deletions.delete_documents_by_docids(vec![0, 2, 42].into_iter().collect());
let indexer = Bump::new();
let index = TempIndex::new();
let rtxn = index.read_txn().unwrap();
let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let fields_ids_map = RwLock::new(db_fields_ids_map.clone());
let fields_ids_map_store = ThreadLocal::new();
let mut extractor_allocs = ThreadLocal::new();
let doc_allocs = ThreadLocal::new();
let deletion_tracker = TrackDeletion(PhantomData);
let changes = deletions
.into_changes(&indexer, crate::documents::PrimaryKey::Flat { name: "id", field_id: 0 });
let context = IndexingContext {
index: &index,
db_fields_ids_map: &db_fields_ids_map,
new_fields_ids_map: &fields_ids_map,
doc_allocs: &doc_allocs,
fields_ids_map_store: &fields_ids_map_store,
};
for _ in 0..3 {
let datastore = ThreadLocal::new();
for_each_document_change(
&changes,
&deletion_tracker,
context,
&mut extractor_allocs,
&datastore,
)
.unwrap();
for (index, data) in datastore.into_iter().enumerate() {
println!("deleted by {index}: {:?}", data.deleted.borrow());
}
for alloc in extractor_allocs.iter_mut() {
alloc.0.reset();
}
}
}
}

View file

@ -0,0 +1,401 @@
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
use heed::RoTxn;
use memmap2::Mmap;
use rayon::iter::IntoParallelIterator;
use serde_json::value::RawValue;
use IndexDocumentsMethod as Idm;
use super::super::document_change::DocumentChange;
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
use crate::documents::PrimaryKey;
use crate::update::new::document::DocumentFromVersions;
use crate::update::new::document_change::Versions;
use crate::update::new::{Deletion, Insertion, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError};
pub struct DocumentOperation<'pl> {
operations: Vec<Payload<'pl>>,
index_documents_method: IndexDocumentsMethod,
}
pub struct DocumentOperationChanges<'pl> {
docids_version_offsets: &'pl [(&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>]))],
index_documents_method: IndexDocumentsMethod,
}
pub enum Payload<'pl> {
Addition(&'pl [u8]),
Deletion(&'pl [&'pl str]),
}
pub struct PayloadStats {
pub document_count: usize,
pub bytes: u64,
}
#[derive(Clone)]
pub enum InnerDocOp<'pl> {
Addition(DocumentOffset<'pl>),
Deletion,
}
/// Represents an offset where a document lives
/// in an mmapped grenad reader file.
#[derive(Clone)]
pub struct DocumentOffset<'pl> {
/// The mmapped payload files.
pub content: &'pl [u8],
}
impl<'pl> DocumentOperation<'pl> {
pub fn new(method: IndexDocumentsMethod) -> Self {
Self { operations: Default::default(), index_documents_method: method }
}
/// TODO please give me a type
/// The payload is expected to be in the grenad format
pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result<PayloadStats> {
payload.advise(memmap2::Advice::Sequential)?;
let document_count =
memchr::memmem::find_iter(&payload[..], "}{").count().saturating_add(1);
self.operations.push(Payload::Addition(&payload[..]));
Ok(PayloadStats { bytes: payload.len() as u64, document_count })
}
pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) {
self.operations.push(Payload::Deletion(to_delete))
}
pub fn into_changes(
self,
indexer: &'pl Bump,
index: &Index,
rtxn: &RoTxn,
primary_key: &PrimaryKey,
new_fields_ids_map: &mut FieldsIdsMap,
) -> Result<DocumentOperationChanges<'pl>> {
// will contain nodes from the intermediate hashmap
let document_changes_alloc = Bump::with_capacity(1024 * 1024 * 1024); // 1 MiB
let documents_ids = index.documents_ids(rtxn)?;
let mut available_docids = AvailableIds::new(&documents_ids);
let mut docids_version_offsets =
hashbrown::HashMap::<&'pl str, _, _, _>::new_in(&document_changes_alloc);
for operation in self.operations {
match operation {
Payload::Addition(payload) => {
let mut iter =
serde_json::Deserializer::from_slice(payload).into_iter::<&RawValue>();
/// TODO manage the error
let mut previous_offset = 0;
while let Some(document) =
iter.next().transpose().map_err(UserError::SerdeJson)?
{
let external_document_id = primary_key.extract_fields_and_docid(
document,
new_fields_ids_map,
indexer,
)?;
let external_document_id = external_document_id.to_de();
let current_offset = iter.byte_offset();
let document_operation = InnerDocOp::Addition(DocumentOffset {
content: &payload[previous_offset..current_offset],
});
match docids_version_offsets.get_mut(external_document_id) {
None => {
let (docid, is_new) = match index
.external_documents_ids()
.get(rtxn, external_document_id)?
{
Some(docid) => (docid, false),
None => (
available_docids.next().ok_or(Error::UserError(
UserError::DocumentLimitReached,
))?,
true,
),
};
docids_version_offsets.insert(
external_document_id,
(
(docid, is_new),
bumpalo::vec![in indexer; document_operation],
),
);
}
Some((_, offsets)) => {
let useless_previous_addition = match self.index_documents_method {
IndexDocumentsMethod::ReplaceDocuments => {
MergeDocumentForReplacement::USELESS_PREVIOUS_CHANGES
}
IndexDocumentsMethod::UpdateDocuments => {
MergeDocumentForUpdates::USELESS_PREVIOUS_CHANGES
}
};
if useless_previous_addition {
offsets.clear();
}
offsets.push(document_operation);
}
}
previous_offset = iter.byte_offset();
}
}
Payload::Deletion(to_delete) => {
for external_document_id in to_delete {
match docids_version_offsets.get_mut(external_document_id) {
None => {
let (docid, is_new) = match index
.external_documents_ids()
.get(rtxn, external_document_id)?
{
Some(docid) => (docid, false),
None => (
available_docids.next().ok_or(Error::UserError(
UserError::DocumentLimitReached,
))?,
true,
),
};
docids_version_offsets.insert(
external_document_id,
(
(docid, is_new),
bumpalo::vec![in indexer; InnerDocOp::Deletion],
),
);
}
Some((_, offsets)) => {
offsets.clear();
offsets.push(InnerDocOp::Deletion);
}
}
}
}
}
}
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = docids_version_offsets
.drain()
.map(|(item, (docid, v))| (item, (docid, v.into_bump_slice())))
.collect_in(indexer);
// Reorder the offsets to make sure we iterate on the file sequentially
let sort_function_key = match self.index_documents_method {
Idm::ReplaceDocuments => MergeDocumentForReplacement::sort_key,
Idm::UpdateDocuments => MergeDocumentForUpdates::sort_key,
};
// And finally sort them
docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops));
let docids_version_offsets = docids_version_offsets.into_bump_slice();
Ok(DocumentOperationChanges {
docids_version_offsets,
index_documents_method: self.index_documents_method,
})
}
}
impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
type Item = &'pl (&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>]));
fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator<Item = Self::Item> {
self.docids_version_offsets.into_par_iter()
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
item: Self::Item,
) -> Result<Option<DocumentChange<'doc>>>
where
'pl: 'doc,
{
let document_merge_function = match self.index_documents_method {
Idm::ReplaceDocuments => MergeDocumentForReplacement::merge,
Idm::UpdateDocuments => MergeDocumentForUpdates::merge,
};
let (external_doc, ((internal_docid, is_new), operations)) = *item;
let change = document_merge_function(
internal_docid,
external_doc,
is_new,
&context.doc_alloc,
operations,
)?;
Ok(change)
}
}
trait MergeChanges {
/// Whether the payloads in the list of operations are useless or not.
const USELESS_PREVIOUS_CHANGES: bool;
/// Returns a key that is used to order the payloads the right way.
fn sort_key(docops: &[InnerDocOp]) -> usize;
fn merge<'doc>(
docid: DocumentId,
external_docid: &'doc str,
is_new: bool,
doc_alloc: &'doc Bump,
operations: &'doc [InnerDocOp],
) -> Result<Option<DocumentChange<'doc>>>;
}
struct MergeDocumentForReplacement;
impl MergeChanges for MergeDocumentForReplacement {
const USELESS_PREVIOUS_CHANGES: bool = true;
/// Reorders to read only the last change.
fn sort_key(docops: &[InnerDocOp]) -> usize {
let f = |ido: &_| match ido {
InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize),
InnerDocOp::Deletion => None,
};
docops.iter().rev().find_map(f).unwrap_or(0)
}
/// Returns only the most recent version of a document based on the updates from the payloads.
///
/// This function is only meant to be used when doing a replacement and not an update.
fn merge<'doc>(
docid: DocumentId,
external_doc: &'doc str,
is_new: bool,
doc_alloc: &'doc Bump,
operations: &'doc [InnerDocOp],
) -> Result<Option<DocumentChange<'doc>>> {
match operations.last() {
Some(InnerDocOp::Addition(DocumentOffset { content })) => {
let document = serde_json::from_slice(content).unwrap();
let document = raw_collections::RawMap::from_raw_value(document, doc_alloc)
.map_err(UserError::SerdeJson)?;
let document = document.into_bump_slice();
let document = DocumentFromVersions::new(Versions::Single(document));
if is_new {
Ok(Some(DocumentChange::Insertion(Insertion::create(
docid,
external_doc,
document,
))))
} else {
Ok(Some(DocumentChange::Update(Update::create(
docid,
external_doc,
document,
true,
))))
}
}
Some(InnerDocOp::Deletion) => {
return if is_new {
let deletion = Deletion::create(docid, external_doc);
Ok(Some(DocumentChange::Deletion(deletion)))
} else {
Ok(None)
};
}
None => unreachable!("We must not have empty set of operations on a document"),
}
}
}
struct MergeDocumentForUpdates;
impl MergeChanges for MergeDocumentForUpdates {
const USELESS_PREVIOUS_CHANGES: bool = false;
/// Reorders to read the first changes first so that it's faster to read the first one and then the rest.
fn sort_key(docops: &[InnerDocOp]) -> usize {
let f = |ido: &_| match ido {
InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize),
InnerDocOp::Deletion => None,
};
docops.iter().find_map(f).unwrap_or(0)
}
/// Reads the previous version of a document from the database, the new versions
/// in the grenad update files and merges them to generate a new boxed obkv.
///
/// This function is only meant to be used when doing an update and not a replacement.
fn merge<'doc>(
docid: DocumentId,
external_docid: &'doc str,
is_new: bool,
doc_alloc: &'doc Bump,
operations: &'doc [InnerDocOp],
) -> Result<Option<DocumentChange<'doc>>> {
if operations.is_empty() {
unreachable!("We must not have empty set of operations on a document");
}
let last_deletion = operations.iter().rposition(|op| matches!(op, InnerDocOp::Deletion));
let operations = &operations[last_deletion.map_or(0, |i| i + 1)..];
let has_deletion = last_deletion.is_some();
if operations.is_empty() {
return if !is_new {
let deletion = Deletion::create(docid, external_docid);
Ok(Some(DocumentChange::Deletion(deletion)))
} else {
Ok(None)
};
}
let mut versions = bumpalo::collections::Vec::with_capacity_in(operations.len(), doc_alloc);
for operation in operations {
let DocumentOffset { content } = match operation {
InnerDocOp::Addition(offset) => offset,
InnerDocOp::Deletion => {
unreachable!("Deletion in document operations")
}
};
let document = serde_json::from_slice(content).unwrap();
let document = raw_collections::RawMap::from_raw_value(document, doc_alloc)
.map_err(UserError::SerdeJson)?;
let document = document.into_bump_slice();
versions.push(document);
}
let versions = versions.into_bump_slice();
let versions = match versions {
[single] => Versions::Single(single),
versions => Versions::Multiple(versions),
};
let document = DocumentFromVersions::new(versions);
if is_new {
Ok(Some(DocumentChange::Insertion(Insertion::create(docid, external_docid, document))))
} else {
Ok(Some(DocumentChange::Update(Update::create(
docid,
external_docid,
document,
has_deletion,
))))
}
}
}

View file

@ -0,0 +1,519 @@
use std::cmp::Ordering;
use std::sync::RwLock;
use std::thread::{self, Builder};
use big_s::S;
use document_changes::{
for_each_document_change, DocumentChanges, FullySend, IndexingContext, ThreadLocal,
};
pub use document_deletion::DocumentDeletion;
pub use document_operation::DocumentOperation;
use heed::types::{Bytes, DecodeIgnore, Str};
use heed::{RoTxn, RwTxn};
use itertools::{merge_join_by, EitherOrBoth};
pub use partial_dump::PartialDump;
use rayon::ThreadPool;
use time::OffsetDateTime;
pub use update_by_function::UpdateByFunction;
use super::channel::*;
use super::extract::*;
use super::facet_search_builder::FacetSearchBuilder;
use super::merger::{FacetDatabases, FacetFieldIdsDelta};
use super::word_fst_builder::PrefixDelta;
use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
};
use super::{StdResult, TopLevelMap};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::facet::FacetType;
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::proximity::ProximityPrecision;
use crate::update::del_add::DelAdd;
use crate::update::new::word_fst_builder::{PrefixData, WordFstBuilder};
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids};
use crate::update::settings::InnerIndexSettings;
use crate::update::{FacetsUpdateBulk, GrenadParameters};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
pub mod de;
pub mod document_changes;
mod document_deletion;
mod document_operation;
mod partial_dump;
mod update_by_function;
/// This is the main function of this crate.
///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
///
/// TODO return stats
pub fn index<'pl, 'indexer, 'index, DC>(
wtxn: &mut RwTxn,
index: &'index Index,
db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: FieldsIdsMap,
new_primary_key: Option<PrimaryKey<'pl>>,
pool: &ThreadPool,
document_changes: &DC,
) -> Result<()>
where
DC: DocumentChanges<'pl>,
{
// TODO find a better channel limit
let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000);
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
let indexing_context = IndexingContext {
index,
db_fields_ids_map,
new_fields_ids_map: &new_fields_ids_map,
doc_allocs: &doc_allocs,
fields_ids_map_store: &fields_ids_map_store,
};
thread::scope(|s| -> crate::Result<_> {
let indexer_span = tracing::Span::current();
// TODO manage the errors correctly
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.in_place_scope(|_s| {
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let _entered = span.enter();
// document but we need to create a function that collects and compresses documents.
let rtxn = index.read_txn().unwrap();
let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender);
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?;
let mut documents_ids = index.documents_ids(&rtxn)?;
let delta_documents_ids = datastore.into_iter().map(|FullySend(d)| d.into_inner()).reduce(DelAddRoaringBitmap::merge).unwrap_or_default();
delta_documents_ids.apply_to(&mut documents_ids);
extractor_sender.send_documents_ids(documents_ids).unwrap();
// document_sender.finish().unwrap();
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let current_num_threads = rayon::current_num_threads();
let max_memory = TEN_GIB / current_num_threads;
eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads");
let grenad_parameters = GrenadParameters {
max_memory: Some(max_memory),
..GrenadParameters::default()
};
let facet_field_ids_delta;
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter();
facet_field_ids_delta = merge_and_send_facet_docids(
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?,
FacetDatabases::new(index),
index,
extractor_sender.facet_docids(),
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
let WordDocidsCaches {
word_docids,
word_fid_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
// TODO Word Docids Merger
// extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
merge_and_send_docids(
word_docids,
index.word_docids.remap_types(),
index,
extractor_sender.docids::<WordDocids>(),
)?;
}
// Word Fid Docids Merging
// extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter();
merge_and_send_docids(
word_fid_docids,
index.word_fid_docids.remap_types(),
index,
extractor_sender.docids::<WordFidDocids>()
)?;
}
// Exact Word Docids Merging
// extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter();
merge_and_send_docids(
exact_word_docids,
index.exact_word_docids.remap_types(),
index,
extractor_sender.docids::<ExactWordDocids>(),
)?;
}
// Word Position Docids Merging
// extractor_sender.send_searchable::<WordPositionDocids>(word_position_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter();
merge_and_send_docids(
word_position_docids,
index.word_position_docids.remap_types(),
index,
extractor_sender.docids::<WordPositionDocids>(),
)?;
}
// Fid Word Count Docids Merging
// extractor_sender.send_searchable::<FidWordCountDocids>(fid_word_count_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let _entered = span.enter();
merge_and_send_docids(
fid_word_count_docids,
index.field_id_word_count_docids.remap_types(),
index,
extractor_sender.docids::<FidWordCountDocids>(),
)?;
}
}
// run the proximity extraction only if the precision is by word
// this works only if the settings didn't change during this transaction.
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default();
if proximity_precision == ProximityPrecision::ByWord {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter();
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
merge_and_send_docids(
caches,
index.word_pair_proximity_docids.remap_types(),
index,
extractor_sender.docids::<WordPairProximityDocids>(),
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
let _entered = span.enter();
}
// TODO THIS IS TOO MUCH
// - [ ] Extract fieldid docid facet number
// - [ ] Extract fieldid docid facet string
// - [ ] Extract facetid string fst
// - [ ] Extract facetid normalized string strings
// TODO Inverted Indexes again
// - [x] Extract fieldid facet isempty docids
// - [x] Extract fieldid facet isnull docids
// - [x] Extract fieldid facet exists docids
// TODO This is the normal system
// - [x] Extract fieldid facet number docids
// - [x] Extract fieldid facet string docids
// TODO use None when needed
Result::Ok(facet_field_ids_delta)
})
})?;
for operation in writer_receiver {
let database = operation.database(index);
match operation.entry() {
EntryOperation::Delete(e) => {
if !database.delete(wtxn, e.entry())? {
unreachable!("We tried to delete an unknown key")
}
}
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?,
}
}
/// TODO handle the panicking threads
let facet_field_ids_delta = extractor_handle.join().unwrap()?;
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
compute_prefix_database(index, wtxn, prefix_delta)?;
}
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
Result::Ok(())
})?;
// required to into_inner the new_fields_ids_map
drop(fields_ids_map_store);
let fields_ids_map = new_fields_ids_map.into_inner().unwrap();
index.put_fields_ids_map(wtxn, &fields_ids_map)?;
if let Some(new_primary_key) = new_primary_key {
index.put_primary_key(wtxn, new_primary_key.name())?;
}
// used to update the localized and weighted maps while sharing the update code with the settings pipeline.
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?;
inner_index_settings.recompute_facets(wtxn, index)?;
inner_index_settings.recompute_searchables(wtxn, index)?;
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
fn compute_prefix_database(
index: &Index,
wtxn: &mut RwTxn,
prefix_delta: PrefixDelta,
) -> Result<()> {
eprintln!("prefix_delta: {:?}", &prefix_delta);
let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids
compute_word_prefix_docids(wtxn, index, &modified, &deleted)?;
// Compute exact word prefix docids
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix fid docids
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix position docids
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted)
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing")]
fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> {
let rtxn = index.read_txn()?;
let words_fst = index.words_fst(&rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
let prefix_settings = index.prefix_settings(&rtxn)?;
word_fst_builder.with_prefix_settings(prefix_settings);
let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::<Bytes>();
let current_words = index.word_docids.iter(wtxn)?.remap_data_type::<Bytes>();
for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) {
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
}) {
match eob {
EitherOrBoth::Both(lhs, rhs) => {
let (word, lhs_bytes) = lhs?;
let (_, rhs_bytes) = rhs?;
if lhs_bytes != rhs_bytes {
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
}
}
EitherOrBoth::Left(result) => {
let (word, _) = result?;
word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?;
}
EitherOrBoth::Right(result) => {
let (word, _) = result?;
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
}
}
}
let span = tracing::trace_span!(target: "indexing::documents::merge", "words_fst");
let _entered = span.enter();
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
// extractor_sender.main().write_words_fst(word_fst_mmap).unwrap();
index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
// extractor_sender.main().write_words_prefixes_fst(prefixes_fst_mmap).unwrap();
index.main.remap_types::<Str, Bytes>().put(
wtxn,
WORDS_PREFIXES_FST_KEY,
&prefixes_fst_mmap,
)?;
Ok(Some(prefix_delta))
} else {
Ok(None)
}
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")]
fn compute_facet_search_database(
index: &Index,
wtxn: &mut RwTxn,
global_fields_ids_map: GlobalFieldsIdsMap,
) -> Result<()> {
let rtxn = index.read_txn()?;
let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?;
let mut facet_search_builder = FacetSearchBuilder::new(
global_fields_ids_map,
localized_attributes_rules.unwrap_or_default(),
);
let previous_facet_id_string_docids = index
.facet_id_string_docids
.iter(&rtxn)?
.remap_data_type::<DecodeIgnore>()
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
let current_facet_id_string_docids = index
.facet_id_string_docids
.iter(wtxn)?
.remap_data_type::<DecodeIgnore>()
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
for eob in merge_join_by(
previous_facet_id_string_docids,
current_facet_id_string_docids,
|lhs, rhs| match (lhs, rhs) {
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
},
) {
match eob {
EitherOrBoth::Both(lhs, rhs) => {
let (_, _) = lhs?;
let (_, _) = rhs?;
}
EitherOrBoth::Left(result) => {
let (key, _) = result?;
facet_search_builder
.register_from_key(DelAdd::Deletion, key.left_bound.as_ref())?;
}
EitherOrBoth::Right(result) => {
let (key, _) = result?;
facet_search_builder
.register_from_key(DelAdd::Addition, key.left_bound.as_ref())?;
}
}
}
facet_search_builder.merge_and_write(index, wtxn, &rtxn)
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
fn compute_facet_level_database(
index: &Index,
wtxn: &mut RwTxn,
facet_field_ids_delta: FacetFieldIdsDelta,
) -> Result<()> {
eprintln!("facet_field_ids_delta: {:?}", &facet_field_ids_delta);
if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
let _entered = span.enter();
FacetsUpdateBulk::new_not_updating_level_0(
index,
modified_facet_string_ids,
FacetType::String,
)
.execute(wtxn)?;
}
if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
let _entered = span.enter();
FacetsUpdateBulk::new_not_updating_level_0(
index,
modified_facet_number_ids,
FacetType::Number,
)
.execute(wtxn)?;
}
Ok(())
}
/// Returns the primary key that has already been set for this index or the
/// one we will guess by searching for the first key that contains "id" as a substring,
/// and whether the primary key changed
/// TODO move this elsewhere
pub fn retrieve_or_guess_primary_key<'a>(
rtxn: &'a RoTxn<'a>,
index: &Index,
new_fields_ids_map: &mut FieldsIdsMap,
primary_key_from_op: Option<&'a str>,
first_document: Option<&'a TopLevelMap<'a>>,
) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> {
// make sure that we have a declared primary key, either fetching it from the index or attempting to guess it.
// do we have an existing declared primary key?
let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? {
// did we request a primary key in the operation?
match primary_key_from_op {
// we did, and it is different from the DB one
Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => {
// is the index empty?
if index.number_of_documents(rtxn)? == 0 {
// change primary key
(primary_key_from_op, true)
} else {
return Ok(Err(UserError::PrimaryKeyCannotBeChanged(
primary_key_from_db.to_string(),
)));
}
}
_ => (primary_key_from_db, false),
}
} else {
// no primary key in the DB => let's set one
// did we request a primary key in the operation?
let primary_key = if let Some(primary_key_from_op) = primary_key_from_op {
// set primary key from operation
primary_key_from_op
} else {
// guess primary key
let first_document = match first_document {
Some(document) => document,
// previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found
None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
};
let mut guesses: Vec<&str> = first_document
.keys()
.map(AsRef::as_ref)
.filter(|name| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY))
.collect();
// sort the keys in lexicographical order, so that fields are always in the same order.
guesses.sort_unstable();
match guesses.as_slice() {
[] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
[name] => {
tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
*name
}
multiple => {
return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
candidates: multiple
.iter()
.map(|candidate| candidate.to_string())
.collect(),
}))
}
}
};
(primary_key, true)
};
match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) {
Ok(primary_key) => Ok(Ok((primary_key, has_changed))),
Err(err) => Ok(Err(err)),
}
}

View file

@ -0,0 +1,83 @@
use std::ops::DerefMut;
use rayon::iter::IndexedParallelIterator;
use serde_json::value::RawValue;
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend, RefCellExt};
use crate::documents::PrimaryKey;
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
use crate::update::new::document::DocumentFromVersions;
use crate::update::new::document_change::Versions;
use crate::update::new::{DocumentChange, Insertion};
use crate::{Error, InternalError, Result, UserError};
pub struct PartialDump<I> {
iter: I,
}
impl<I> PartialDump<I> {
pub fn new_from_jsonlines(iter: I) -> Self {
PartialDump { iter }
}
pub fn into_changes<'index>(
self,
concurrent_available_ids: &'index ConcurrentAvailableIds,
primary_key: &'index PrimaryKey,
) -> PartialDumpChanges<'index, I> {
/// Note for future self:
/// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items).
PartialDumpChanges { iter: self.iter, concurrent_available_ids, primary_key }
}
}
pub struct PartialDumpChanges<'doc, I> {
iter: I,
concurrent_available_ids: &'doc ConcurrentAvailableIds,
primary_key: &'doc PrimaryKey<'doc>,
}
impl<'index, Iter> DocumentChanges<'index> for PartialDumpChanges<'index, Iter>
where
Iter: IndexedParallelIterator<Item = Box<RawValue>> + Clone + Sync + 'index,
{
type Item = Box<RawValue>;
fn iter(&self) -> impl IndexedParallelIterator<Item = Self::Item> {
self.iter.clone()
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
document: Self::Item,
) -> Result<Option<DocumentChange<'doc>>>
where
'index: 'doc,
{
let doc_alloc = &context.doc_alloc;
let docid = match self.concurrent_available_ids.next() {
Some(id) => id,
None => return Err(Error::UserError(UserError::DocumentLimitReached)),
};
let mut fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let fields_ids_map = fields_ids_map.deref_mut();
let document = doc_alloc.alloc_str(document.get());
let document: &RawValue = unsafe { std::mem::transmute(document) };
let external_document_id =
self.primary_key.extract_fields_and_docid(document, fields_ids_map, doc_alloc)?;
let external_document_id = external_document_id.to_de();
let document = raw_collections::RawMap::from_raw_value(document, doc_alloc)
.map_err(InternalError::SerdeJson)?;
let document = document.into_bump_slice();
let document = DocumentFromVersions::new(Versions::Single(document));
let insertion = Insertion::create(docid, external_document_id, document);
Ok(Some(DocumentChange::Insertion(insertion)))
}
}

View file

@ -0,0 +1,204 @@
use raw_collections::RawMap;
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
use roaring::RoaringBitmap;
use super::document_changes::{DocumentChangeContext, MostlySend, RefCellExt};
use super::DocumentChanges;
use crate::documents::Error::InvalidDocumentFormat;
use crate::documents::PrimaryKey;
use crate::error::{FieldIdMapMissingEntry, InternalError};
use crate::update::new::document::DocumentFromVersions;
use crate::update::new::document_change::Versions;
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
pub struct UpdateByFunction {
documents: RoaringBitmap,
context: Option<Object>,
code: String,
}
pub struct UpdateByFunctionChanges<'doc> {
primary_key: &'doc PrimaryKey<'doc>,
engine: Engine,
ast: AST,
context: Option<Dynamic>,
// It is sad that the RoaringBitmap doesn't
// implement IndexedParallelIterator
documents: Vec<u32>,
}
impl UpdateByFunction {
pub fn new(documents: RoaringBitmap, context: Option<Object>, code: String) -> Self {
UpdateByFunction { documents, context, code }
}
pub fn into_changes<'index>(
self,
primary_key: &'index PrimaryKey,
) -> Result<UpdateByFunctionChanges<'index>> {
let Self { documents, context, code } = self;
// Setup the security and limits of the Engine
let mut engine = Engine::new();
engine.set_optimization_level(OptimizationLevel::Full);
engine.set_max_call_levels(1000);
// It is an arbitrary value. We need to let users define this in the settings.
engine.set_max_operations(1_000_000);
engine.set_max_variables(1000);
engine.set_max_functions(30);
engine.set_max_expr_depths(100, 1000);
engine.set_max_string_size(1024 * 1024 * 1024); // 1 GiB
engine.set_max_array_size(10_000);
engine.set_max_map_size(10_000);
let ast = engine.compile(code).map_err(UserError::DocumentEditionCompilationError)?;
let context = match context {
Some(context) => {
Some(serde_json::from_value(context.into()).map_err(InternalError::SerdeJson)?)
}
None => None,
};
Ok(UpdateByFunctionChanges {
primary_key,
engine,
ast,
context,
documents: documents.into_iter().collect(),
})
}
}
impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
type Item = u32;
fn iter(&self) -> impl IndexedParallelIterator<Item = Self::Item> {
self.documents.par_iter().copied()
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&self,
context: &'doc DocumentChangeContext<T>,
docid: Self::Item,
) -> Result<Option<DocumentChange<'doc>>>
where
'index: 'doc,
{
let DocumentChangeContext {
index,
db_fields_ids_map,
txn,
new_fields_ids_map,
doc_alloc,
..
} = context;
// safety: Both documents *must* exists in the database as
// their IDs comes from the list of documents ids.
let document = index.document(txn, docid)?;
let rhai_document = obkv_to_rhaimap(document, db_fields_ids_map)?;
let json_document = all_obkv_to_json(document, db_fields_ids_map)?;
let document_id = self
.primary_key
.document_id(document, db_fields_ids_map)?
.map_err(|_| InvalidDocumentFormat)?;
let mut scope = Scope::new();
if let Some(context) = self.context.as_ref().cloned() {
scope.push_constant_dynamic("context", context.clone());
}
scope.push("doc", rhai_document);
// We run the user script which edits "doc" scope variable reprensenting
// the document and ignore the output and even the type of it, i.e., Dynamic.
let _ = self
.engine
.eval_ast_with_scope::<Dynamic>(&mut scope, &self.ast)
.map_err(UserError::DocumentEditionRuntimeError)?;
match scope.remove::<Dynamic>("doc") {
// If the "doc" variable has been set to (), we effectively delete the document.
Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(Deletion::create(
docid,
doc_alloc.alloc_str(&document_id),
)))),
None => unreachable!("missing doc variable from the Rhai scope"),
Some(new_document) => match new_document.try_cast() {
Some(new_rhai_document) => {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
serde_json::to_writer(&mut buffer, &new_rhai_document)
.map_err(InternalError::SerdeJson)?;
let raw_new_doc = serde_json::from_slice(buffer.into_bump_slice())
.map_err(InternalError::SerdeJson)?;
// Note: This condition is not perfect. Sometimes it detect changes
// like with floating points numbers and consider updating
// the document even if nothing actually changed.
//
// Future: Use a custom function rhai function to track changes.
// <https://docs.rs/rhai/latest/rhai/struct.Engine.html#method.register_indexer_set>
if json_document != rhaimap_to_object(new_rhai_document) {
let mut global_fields_ids_map = new_fields_ids_map.borrow_mut_or_yield();
let new_document_id = self
.primary_key
.extract_fields_and_docid(
raw_new_doc,
&mut *global_fields_ids_map,
doc_alloc,
)?
.to_de();
if document_id != new_document_id {
Err(Error::UserError(UserError::DocumentEditionCannotModifyPrimaryKey))
} else {
let raw_new_doc = RawMap::from_raw_value(raw_new_doc, doc_alloc)
.map_err(InternalError::SerdeJson)?;
let new_doc_version = DocumentFromVersions::new(Versions::Single(
raw_new_doc.into_bump_slice(),
));
Ok(Some(DocumentChange::Update(Update::create(
docid,
new_document_id,
new_doc_version,
true, // It is like document replacement
))))
}
} else {
Ok(None)
}
}
None => Err(Error::UserError(UserError::DocumentEditionDocumentMustBeObject)),
},
}
}
}
fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
let map: Result<rhai::Map> = all_keys
.iter()
.copied()
.flat_map(|id| obkv.get(id).map(|value| (id, value)))
.map(|(id, value)| {
let name = fields_ids_map.name(id).ok_or(FieldIdMapMissingEntry::FieldId {
field_id: id,
process: "all_obkv_to_rhaimap",
})?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
Ok((name.into(), value))
})
.collect();
map
}
fn rhaimap_to_object(map: rhai::Map) -> Object {
let mut output = Object::new();
for (key, value) in map {
let value = serde_json::to_value(&value).unwrap();
output.insert(key.into(), value);
}
output
}