use std::cell::{Cell, RefCell}; use std::sync::{Arc, RwLock}; use bumpalo::Bump; use heed::RoTxn; use rayon::iter::IndexedParallelIterator; use super::super::document_change::DocumentChange; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; /// A trait for types that are **not** [`Send`] only because they would then allow concurrent access to a type that is not [`Sync`]. /// /// The primary example of such a type is `&T`, with `T: !Sync`. /// /// In the authors' understanding, a type can be `!Send` for two distinct reasons: /// /// 1. Because it contains data that *genuinely* cannot be moved between threads, such as thread-local data. /// 2. Because sending the type would allow concurrent access to a `!Sync` type, which is undefined behavior. /// /// `MostlySend` exists to be used in bounds where you need a type whose data is **not** *attached* to a thread /// because you might access it from a different thread, but where you will never access the type **concurrently** from /// multiple threads. /// /// Like [`Send`], `MostlySend` assumes properties on types that cannot be verified by the compiler, which is why implementing /// this trait is unsafe. /// /// # Safety /// /// Implementers of this trait promises that the following properties hold on the implementing type: /// /// 1. Its data can be accessed from any thread and will be the same regardless of the thread accessing it. /// 2. Any operation that can be performed on the type does not depend on the thread that executes it. /// /// As these properties are subtle and are not generally tracked by the Rust type system, great care should be taken before /// implementing `MostlySend` on a type, especially a foreign type. /// /// - An example of a type that verifies (1) and (2) is [`std::rc::Rc`] (when `T` is `Send` and `Sync`). /// - An example of a type that doesn't verify (1) is thread-local data. /// - An example of a type that doesn't verify (2) is [`std::sync::MutexGuard`]: a lot of mutex implementations require that /// a lock is returned to the operating system on the same thread that initially locked the mutex, failing to uphold this /// invariant will cause Undefined Behavior /// (see last ยง in [the nomicon](https://doc.rust-lang.org/nomicon/send-and-sync.html)). /// /// It is **always safe** to implement this trait on a type that is `Send`, but no placeholder impl is provided due to limitations in /// coherency. Use the [`FullySend`] wrapper in this situation. pub unsafe trait MostlySend {} #[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct FullySend(pub T); // SAFETY: a type **fully** send is always mostly send as well. unsafe impl MostlySend for FullySend where T: Send {} unsafe impl MostlySend for RefCell where T: MostlySend {} unsafe impl MostlySend for Option where T: MostlySend {} impl FullySend { pub fn into(self) -> T { self.0 } } impl From for FullySend { fn from(value: T) -> Self { Self(value) } } #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] struct MostlySendWrapper(T); impl MostlySendWrapper { /// # Safety /// /// - (P1) Users of this type will never access the type concurrently from multiple threads without synchronization unsafe fn new(t: T) -> Self { Self(t) } fn as_ref(&self) -> &T { &self.0 } fn as_mut(&mut self) -> &mut T { &mut self.0 } fn into_inner(self) -> T { self.0 } } /// # Safety /// /// 1. `T` is [`MostlySend`], so by its safety contract it can be accessed by any thread and all of its operations are available /// from any thread. /// 2. (P1) of `MostlySendWrapper::new` forces the user to never access the value from multiple threads concurrently. unsafe impl Send for MostlySendWrapper {} /// A wrapper around [`thread_local::ThreadLocal`] that accepts [`MostlySend`] `T`s. #[derive(Default)] pub struct ThreadLocal { inner: thread_local::ThreadLocal>, // FIXME: this should be necessary //_no_send: PhantomData<*mut ()>, } impl ThreadLocal { pub fn new() -> Self { Self { inner: thread_local::ThreadLocal::new() } } pub fn with_capacity(capacity: usize) -> Self { Self { inner: thread_local::ThreadLocal::with_capacity(capacity) } } pub fn clear(&mut self) { self.inner.clear() } pub fn get(&self) -> Option<&T> { self.inner.get().map(|t| t.as_ref()) } pub fn get_or(&self, create: F) -> &T where F: FnOnce() -> T, { /// TODO: move ThreadLocal, MostlySend, FullySend to a dedicated file self.inner.get_or(|| unsafe { MostlySendWrapper::new(create()) }).as_ref() } pub fn get_or_try(&self, create: F) -> std::result::Result<&T, E> where F: FnOnce() -> std::result::Result, { self.inner .get_or_try(|| unsafe { Ok(MostlySendWrapper::new(create()?)) }) .map(MostlySendWrapper::as_ref) } pub fn get_or_default(&self) -> &T where T: Default, { self.inner.get_or_default().as_ref() } pub fn iter_mut(&mut self) -> IterMut { IterMut(self.inner.iter_mut()) } } impl IntoIterator for ThreadLocal { type Item = T; type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { IntoIter(self.inner.into_iter()) } } pub struct IterMut<'a, T: MostlySend>(thread_local::IterMut<'a, MostlySendWrapper>); impl<'a, T: MostlySend> Iterator for IterMut<'a, T> { type Item = &'a mut T; fn next(&mut self) -> Option { self.0.next().map(|t| t.as_mut()) } } pub struct IntoIter(thread_local::IntoIter>); impl Iterator for IntoIter { type Item = T; fn next(&mut self) -> Option { self.0.next().map(|t| t.into_inner()) } } pub struct DocumentChangeContext< 'doc, // covariant lifetime of a single `process` call 'extractor: 'doc, // invariant lifetime of the extractor_allocs 'fid: 'doc, // invariant lifetime of the new_fields_ids_map 'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call T: MostlySend, > { /// The index we're indexing in pub index: &'indexer Index, /// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents /// inside of the DB. pub db_fields_ids_map: &'indexer FieldsIdsMap, /// A transaction providing data from the DB before all indexing operations pub txn: RoTxn<'indexer>, /// Global field id map that is up to date with the current state of the indexing process. /// /// - Inserting a field will take a lock /// - Retrieving a field may take a lock as well pub new_fields_ids_map: &'doc std::cell::RefCell>, /// Data allocated in this allocator is cleared between each call to `process`. pub doc_alloc: Bump, /// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills. pub extractor_alloc: &'extractor Bump, /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents doc_allocs: &'doc ThreadLocal>>, /// Extractor-specific data pub data: &'doc T, } impl< 'doc, // covariant lifetime of a single `process` call 'data: 'doc, // invariant on T lifetime of the datastore 'extractor: 'doc, // invariant lifetime of extractor_allocs 'fid: 'doc, // invariant lifetime of fields ids map 'indexer: 'doc, // covariant lifetime of objects that survive a `process` call T: MostlySend, > DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, T> { #[allow(clippy::too_many_arguments)] pub fn new( index: &'indexer Index, db_fields_ids_map: &'indexer FieldsIdsMap, new_fields_ids_map: &'fid RwLock, extractor_allocs: &'extractor ThreadLocal>, doc_allocs: &'doc ThreadLocal>>, datastore: &'data ThreadLocal, fields_ids_map_store: &'doc ThreadLocal>>>, init_data: F, ) -> Result where F: FnOnce(&'extractor Bump) -> Result, { let doc_alloc = doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024 * 1024)))); let doc_alloc = doc_alloc.0.take(); let fields_ids_map = fields_ids_map_store .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into()); let fields_ids_map = &fields_ids_map.0; let extractor_alloc = extractor_allocs.get_or_default(); let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?; let txn = index.read_txn()?; Ok(DocumentChangeContext { index, txn, db_fields_ids_map, new_fields_ids_map: fields_ids_map, doc_alloc, extractor_alloc: &extractor_alloc.0, data, doc_allocs, }) } } /// An internal iterator (i.e. using `foreach`) of `DocumentChange`s pub trait Extractor<'extractor>: Sync { type Data: MostlySend; fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result; fn process<'doc>( &'doc self, changes: impl Iterator>>, context: &'doc DocumentChangeContext, ) -> Result<()>; } pub trait DocumentChanges<'pl // lifetime of the underlying payload >: Sync { type Item: Send; fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator>; fn len(&self) -> usize; fn is_empty(&self) -> bool { self.len() == 0 } fn item_to_document_change<'doc, // lifetime of a single `process` call T: MostlySend>( &'doc self, context: &'doc DocumentChangeContext, item: &'doc Self::Item, ) -> Result>> where 'pl: 'doc // the payload must survive the process calls ; } pub struct IndexingContext< 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, SP, > where MSP: Fn() -> bool + Sync, SP: Fn(Progress) + Sync, { pub index: &'index Index, pub db_fields_ids_map: &'indexer FieldsIdsMap, pub new_fields_ids_map: &'fid RwLock, pub doc_allocs: &'indexer ThreadLocal>>, pub fields_ids_map_store: &'indexer ThreadLocal>>>, pub must_stop_processing: &'indexer MSP, pub send_progress: &'indexer SP, } impl< 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, SP, > Copy for IndexingContext< 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, SP, > where MSP: Fn() -> bool + Sync, SP: Fn(Progress) + Sync, { } impl< 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, SP, > Clone for IndexingContext< 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation 'index, // covariant lifetime of the index MSP, SP, > where MSP: Fn() -> bool + Sync, SP: Fn(Progress) + Sync, { fn clone(&self) -> Self { *self } } const CHUNK_SIZE: usize = 100; #[allow(clippy::too_many_arguments)] pub fn extract< 'pl, // covariant lifetime of the underlying payload 'extractor, // invariant lifetime of extractor_alloc 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing 'data, // invariant on EX::Data lifetime of datastore 'index, // covariant lifetime of the index EX, DC: DocumentChanges<'pl>, MSP, SP, >( document_changes: &DC, extractor: &EX, IndexingContext { index, db_fields_ids_map, new_fields_ids_map, doc_allocs, fields_ids_map_store, must_stop_processing, send_progress, }: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &'extractor mut ThreadLocal>, datastore: &'data ThreadLocal, finished_steps: u16, total_steps: u16, step_name: &'static str, ) -> Result<()> where EX: Extractor<'extractor>, MSP: Fn() -> bool + Sync, SP: Fn(Progress) + Sync, { eprintln!("We are resetting the extractor allocators"); // Clean up and reuse the extractor allocs for extractor_alloc in extractor_allocs.iter_mut() { eprintln!("\tWith {} bytes resetted", extractor_alloc.0.allocated_bytes()); extractor_alloc.0.reset(); } let total_documents = document_changes.len(); let pi = document_changes.iter(CHUNK_SIZE); pi.enumerate().try_arc_for_each_try_init( || { DocumentChangeContext::new( index, db_fields_ids_map, new_fields_ids_map, extractor_allocs, doc_allocs, datastore, fields_ids_map_store, move |index_alloc| extractor.init_data(index_alloc), ) }, |context, (finished_documents, items)| { if (must_stop_processing)() { return Err(Arc::new(InternalError::AbortedIndexation.into())); } let finished_documents = finished_documents * CHUNK_SIZE; (send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: Some((finished_documents as u32, total_documents as u32)), }); // Clean up and reuse the document-specific allocator context.doc_alloc.reset(); let items = items.as_ref(); let changes = items.iter().filter_map(|item| { document_changes.item_to_document_change(context, item).transpose() }); let res = extractor.process(changes, context).map_err(Arc::new); // send back the doc_alloc in the pool context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); res }, )?; (send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: Some((total_documents as u32, total_documents as u32)), }); Ok(()) } pub struct Progress { pub finished_steps: u16, pub total_steps: u16, pub step_name: &'static str, pub finished_total_documents: Option<(u32, u32)>, }