mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-27 05:37:31 +01:00
Replace the concurrent vec by a linked list
This commit is contained in:
parent
35f78b5423
commit
0409a26cd8
170
milli/src/update/new/append_only_linked_list.rs
Normal file
170
milli/src/update/new/append_only_linked_list.rs
Normal file
@ -0,0 +1,170 @@
|
||||
use std::fmt;
|
||||
use std::mem::{self, ManuallyDrop};
|
||||
use std::sync::atomic::AtomicPtr;
|
||||
|
||||
/// An append-only linked-list that returns a mutable references to the pushed items.
|
||||
pub struct AppendOnlyLinkedList<T> {
|
||||
head: AtomicPtr<Node<T>>,
|
||||
}
|
||||
|
||||
struct Node<T> {
|
||||
item: ManuallyDrop<T>,
|
||||
parent: AtomicPtr<Node<T>>,
|
||||
}
|
||||
|
||||
impl<T> AppendOnlyLinkedList<T> {
|
||||
/// Creates an empty list.
|
||||
pub fn new() -> AppendOnlyLinkedList<T> {
|
||||
AppendOnlyLinkedList { head: AtomicPtr::default() }
|
||||
}
|
||||
|
||||
/// Pushes the item at the front of the linked-list and returns a unique and mutable reference to it.
|
||||
#[allow(clippy::mut_from_ref)] // the mut ref is derived from T and unique each time
|
||||
pub fn push(&self, item: T) -> &mut T {
|
||||
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
|
||||
|
||||
let node = Box::leak(Box::new(Node {
|
||||
item: ManuallyDrop::new(item),
|
||||
parent: AtomicPtr::default(),
|
||||
}));
|
||||
|
||||
let mut head = self.head.load(SeqCst);
|
||||
loop {
|
||||
std::hint::spin_loop();
|
||||
match self.head.compare_exchange_weak(head, node, SeqCst, Relaxed) {
|
||||
Ok(parent) => {
|
||||
node.parent = AtomicPtr::new(parent);
|
||||
break;
|
||||
}
|
||||
Err(new) => head = new,
|
||||
}
|
||||
}
|
||||
|
||||
&mut node.item
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for AppendOnlyLinkedList<T> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for AppendOnlyLinkedList<T> {
|
||||
fn drop(&mut self) {
|
||||
// Let's use the drop implementation of the IntoIter struct
|
||||
IntoIter(mem::take(&mut self.head));
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for AppendOnlyLinkedList<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("AppendOnlyLinkedList").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> IntoIterator for AppendOnlyLinkedList<T> {
|
||||
type Item = T;
|
||||
type IntoIter = IntoIter<T>;
|
||||
|
||||
fn into_iter(mut self) -> Self::IntoIter {
|
||||
IntoIter(mem::take(&mut self.head))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IntoIter<T>(AtomicPtr<Node<T>>);
|
||||
|
||||
impl<T> Iterator for IntoIter<T> {
|
||||
type Item = T;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let ptr = *self.0.get_mut();
|
||||
if ptr.is_null() {
|
||||
None
|
||||
} else {
|
||||
let mut node = unsafe { Box::from_raw(ptr) };
|
||||
// Let's set the next node to read to be the parent of this one
|
||||
self.0 = node.parent;
|
||||
// ...and take the item from the Node before it is dropped
|
||||
let item = unsafe { ManuallyDrop::take(&mut node.item) };
|
||||
Some(item)
|
||||
// ...then drop the Node itself
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for IntoIter<T> {
|
||||
fn drop(&mut self) {
|
||||
let mut ptr = *self.0.get_mut();
|
||||
while !ptr.is_null() {
|
||||
let mut node = unsafe { Box::from_raw(ptr) };
|
||||
// Let's set the next node to read to be the parent of this one
|
||||
ptr = *node.parent.get_mut();
|
||||
// ...and drop the item ourselves.
|
||||
unsafe { ManuallyDrop::drop(&mut node.item) }
|
||||
// ...then drop the Node itself
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parallel_pushing() {
|
||||
use std::sync::Arc;
|
||||
let v = Arc::new(AppendOnlyLinkedList::<u64>::new());
|
||||
let mut threads = Vec::new();
|
||||
const N: u64 = 100;
|
||||
for thread_num in 0..N {
|
||||
let v = v.clone();
|
||||
threads.push(std::thread::spawn(move || {
|
||||
let which1 = v.push(thread_num);
|
||||
let which2 = v.push(thread_num);
|
||||
assert_eq!(*which1, thread_num);
|
||||
assert_eq!(*which2, thread_num);
|
||||
}));
|
||||
}
|
||||
for t in threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
let v = Arc::into_inner(v).unwrap().into_iter().collect::<Vec<_>>();
|
||||
for thread_num in (0..N).rev() {
|
||||
assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_vec() {
|
||||
struct SafeToDrop(bool);
|
||||
|
||||
impl Drop for SafeToDrop {
|
||||
fn drop(&mut self) {
|
||||
assert!(self.0);
|
||||
}
|
||||
}
|
||||
|
||||
let v = AppendOnlyLinkedList::new();
|
||||
|
||||
for _ in 0..50 {
|
||||
v.push(SafeToDrop(false));
|
||||
}
|
||||
|
||||
let mut v = v.into_iter().collect::<Vec<_>>();
|
||||
assert_eq!(v.len(), 50);
|
||||
|
||||
for i in v.iter_mut() {
|
||||
i.0 = true;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_then_index_mut() {
|
||||
let v = AppendOnlyLinkedList::<usize>::new();
|
||||
let mut w = Vec::new();
|
||||
for i in 0..1024 {
|
||||
*v.push(i) += 1;
|
||||
w.push(i + 1);
|
||||
}
|
||||
|
||||
let mut v = v.into_iter().collect::<Vec<_>>();
|
||||
v.reverse();
|
||||
assert_eq!(v, w);
|
||||
}
|
@ -1,327 +0,0 @@
|
||||
// Code taken from <https://github.com/droundy/append-only-vec/blob/main/src/lib.rs>
|
||||
// and modified in order to get a ref mut instead of the index of newly inserted items.
|
||||
|
||||
//! AppendOnlyVec
|
||||
//!
|
||||
//! This is a pretty simple type, which is a vector that you can push into and
|
||||
//! receive a reference to the item you just inserted. The data structure never
|
||||
//! moves an element once allocated, so you can push to the vec even while holding
|
||||
//! mutable references to elements that have already been pushed.
|
||||
//!
|
||||
//! ### Scaling
|
||||
//!
|
||||
//! 1. Accessing an element is O(1), but slightly more expensive than for a
|
||||
//! standard `Vec`.
|
||||
//!
|
||||
//! 2. Pushing a new element amortizes to O(1), but may require allocation of a
|
||||
//! new chunk.
|
||||
//!
|
||||
//! ### Example
|
||||
//!
|
||||
//! ```
|
||||
//! use append_only_vec::AppendOnlyVec;
|
||||
//!
|
||||
//! static V: AppendOnlyVec<String> = AppendOnlyVec::<String>::new();
|
||||
//! let mut threads = Vec::new();
|
||||
//! for thread_num in 0..10 {
|
||||
//! threads.push(std::thread::spawn(move || {
|
||||
//! for n in 0..100 {
|
||||
//! let s = format!("thread {} says {}", thread_num, n);
|
||||
//! let which = V.push(s.clone());
|
||||
//! assert_eq!(&which, &s);
|
||||
//! }
|
||||
//! }));
|
||||
//! }
|
||||
//!
|
||||
//! for t in threads {
|
||||
//! t.join();
|
||||
//! }
|
||||
//!
|
||||
//! assert_eq!(V.len(), 1000);
|
||||
//! ```
|
||||
|
||||
use std::cell::UnsafeCell;
|
||||
use std::fmt::Debug;
|
||||
use std::ptr;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
pub struct AppendOnlyVec<T> {
|
||||
count: AtomicUsize,
|
||||
_reserved: AtomicUsize,
|
||||
data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3],
|
||||
}
|
||||
|
||||
unsafe impl<T: Send> Send for AppendOnlyVec<T> {}
|
||||
unsafe impl<T: Sync + Send> Sync for AppendOnlyVec<T> {}
|
||||
|
||||
const BITS: usize = std::mem::size_of::<usize>() * 8;
|
||||
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
const BITS_USED: usize = 48;
|
||||
#[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))]
|
||||
const BITS_USED: usize = 64;
|
||||
#[cfg(target_pointer_width = "32")]
|
||||
const BITS_USED: usize = 32;
|
||||
|
||||
// This takes an index into a vec, and determines which data array will hold it
|
||||
// (the first return value), and what the index will be into that data array
|
||||
// (second return value)
|
||||
//
|
||||
// The ith data array holds 1<<i values.
|
||||
const fn indices(i: usize) -> (u32, usize) {
|
||||
let i = i + 8;
|
||||
let bin = BITS as u32 - 1 - i.leading_zeros();
|
||||
let bin = bin - 3;
|
||||
let offset = i - bin_size(bin);
|
||||
(bin, offset)
|
||||
}
|
||||
|
||||
const fn bin_size(array: u32) -> usize {
|
||||
(1 << 3) << array
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_indices() {
|
||||
for i in 0..32 {
|
||||
println!("{:3}: {} {}", i, indices(i).0, indices(i).1);
|
||||
}
|
||||
let mut array = 0;
|
||||
let mut offset = 0;
|
||||
let mut index = 0;
|
||||
while index < 1000 {
|
||||
index += 1;
|
||||
offset += 1;
|
||||
if offset >= bin_size(array) {
|
||||
offset = 0;
|
||||
array += 1;
|
||||
}
|
||||
assert_eq!(indices(index), (array, offset));
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AppendOnlyVec<T> {
|
||||
const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(ptr::null_mut());
|
||||
|
||||
/// Allocate a new empty array.
|
||||
pub const fn new() -> Self {
|
||||
AppendOnlyVec {
|
||||
count: AtomicUsize::new(0),
|
||||
_reserved: AtomicUsize::new(0),
|
||||
data: [Self::EMPTY; BITS_USED - 1 - 3],
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the length of the array.
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.count.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
fn layout(array: u32) -> std::alloc::Layout {
|
||||
std::alloc::Layout::array::<T>(bin_size(array)).unwrap()
|
||||
}
|
||||
|
||||
/// Append an element to the array and get a mutable ref to it.
|
||||
///
|
||||
/// This is notable in that it doesn't require a `&mut self`, because it
|
||||
/// does appropriate atomic synchronization.
|
||||
pub fn push(&self, val: T) -> &mut T {
|
||||
let idx = self._reserved.fetch_add(1, Ordering::Relaxed);
|
||||
let (array, offset) = indices(idx);
|
||||
let ptr = if self.len() < 1 + idx - offset {
|
||||
// We are working on a new array, which may not have been allocated...
|
||||
if offset == 0 {
|
||||
// It is our job to allocate the array! The size of the array
|
||||
// is determined in the self.layout method, which needs to be
|
||||
// consistent with the indices function.
|
||||
let layout = Self::layout(array);
|
||||
let ptr = unsafe { std::alloc::alloc(layout) } as *mut T;
|
||||
unsafe {
|
||||
*self.data[array as usize].get() = ptr;
|
||||
}
|
||||
ptr
|
||||
} else {
|
||||
// We need to wait for the array to be allocated.
|
||||
while self.len() < 1 + idx - offset {
|
||||
std::hint::spin_loop();
|
||||
}
|
||||
// The Ordering::Acquire semantics of self.len() ensures that
|
||||
// this pointer read will get the non-null pointer allocated
|
||||
// above.
|
||||
unsafe { *self.data[array as usize].get() }
|
||||
}
|
||||
} else {
|
||||
// The Ordering::Acquire semantics of self.len() ensures that
|
||||
// this pointer read will get the non-null pointer allocated
|
||||
// above.
|
||||
unsafe { *self.data[array as usize].get() }
|
||||
};
|
||||
|
||||
// The contents of this offset are guaranteed to be unused (so far)
|
||||
// because we got the idx from our fetch_add above, and ptr is
|
||||
// guaranteed to be valid because of the loop we used above, which used
|
||||
// self.len() which has Ordering::Acquire semantics.
|
||||
unsafe { (ptr.add(offset)).write(val) };
|
||||
|
||||
// Now we need to increase the size of the vec, so it can get read. We
|
||||
// use Release upon success, to ensure that the value which we wrote is
|
||||
// visible to any thread that has confirmed that the count is big enough
|
||||
// to read that element. In case of failure, we can be relaxed, since
|
||||
// we don't do anything with the result other than try again.
|
||||
while self
|
||||
.count
|
||||
.compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed)
|
||||
.is_err()
|
||||
{
|
||||
// This means that someone else *started* pushing before we started,
|
||||
// but hasn't yet finished. We have to wait for them to finish
|
||||
// pushing before we can update the count. Note that using a
|
||||
// spinloop here isn't really ideal, but except when allocating a
|
||||
// new array, the window between reserving space and using it is
|
||||
// pretty small, so contention will hopefully be rare, and having a
|
||||
// context switch during that interval will hopefully be vanishingly
|
||||
// unlikely.
|
||||
std::hint::spin_loop();
|
||||
}
|
||||
|
||||
unsafe { &mut *ptr }
|
||||
}
|
||||
|
||||
/// Convert into a standard `Vec`.
|
||||
pub fn into_vec(self) -> Vec<T> {
|
||||
let mut vec = Vec::with_capacity(self.len());
|
||||
|
||||
for idx in 0..self.len() {
|
||||
let (array, offset) = indices(idx);
|
||||
// We use a Relaxed load of the pointer, because the loop above (which
|
||||
// ends before `self.len()`) should ensure that the data we want is
|
||||
// already visible, since it Acquired `self.count` which synchronizes
|
||||
// with the write in `self.push`.
|
||||
let ptr = unsafe { *self.data[array as usize].get() };
|
||||
|
||||
// Copy the element value. The copy remaining in the array must not
|
||||
// be used again (i.e. make sure we do not drop it)
|
||||
let value = unsafe { ptr.add(offset).read() };
|
||||
|
||||
vec.push(value);
|
||||
}
|
||||
|
||||
// Prevent dropping the copied-out values by marking the count as 0 before
|
||||
// our own drop is run
|
||||
self.count.store(0, Ordering::Relaxed);
|
||||
|
||||
vec
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for AppendOnlyVec<T> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Debug for AppendOnlyVec<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AppendOnlyVec").field("len", &self.len()).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for AppendOnlyVec<T> {
|
||||
fn drop(&mut self) {
|
||||
// First we'll drop all the `T` in a slightly sloppy way. FIXME this
|
||||
// could be optimized to avoid reloading the `ptr`.
|
||||
for idx in 0..self.len() {
|
||||
let (array, offset) = indices(idx);
|
||||
// We use a Relaxed load of the pointer, because the loop above (which
|
||||
// ends before `self.len()`) should ensure that the data we want is
|
||||
// already visible, since it Acquired `self.count` which synchronizes
|
||||
// with the write in `self.push`.
|
||||
let ptr = unsafe { *self.data[array as usize].get() };
|
||||
unsafe {
|
||||
ptr::drop_in_place(ptr.add(offset));
|
||||
}
|
||||
}
|
||||
// Now we will free all the arrays.
|
||||
for array in 0..self.data.len() as u32 {
|
||||
// This load is relaxed because no other thread can have a reference
|
||||
// to Self because we have a &mut self.
|
||||
let ptr = unsafe { *self.data[array as usize].get() };
|
||||
if !ptr.is_null() {
|
||||
let layout = Self::layout(array);
|
||||
unsafe { std::alloc::dealloc(ptr as *mut u8, layout) };
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> IntoIterator for AppendOnlyVec<T> {
|
||||
type Item = T;
|
||||
type IntoIter = std::vec::IntoIter<T>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.into_vec().into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parallel_pushing() {
|
||||
use std::sync::Arc;
|
||||
let v = Arc::new(AppendOnlyVec::<u64>::new());
|
||||
let mut threads = Vec::new();
|
||||
const N: u64 = 100;
|
||||
for thread_num in 0..N {
|
||||
let v = v.clone();
|
||||
threads.push(std::thread::spawn(move || {
|
||||
let which1 = v.push(thread_num);
|
||||
let which2 = v.push(thread_num);
|
||||
assert_eq!(*which1, thread_num);
|
||||
assert_eq!(*which2, thread_num);
|
||||
}));
|
||||
}
|
||||
for t in threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
let v = Arc::into_inner(v).unwrap().into_vec();
|
||||
for thread_num in 0..N {
|
||||
assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_into_vec() {
|
||||
struct SafeToDrop(bool);
|
||||
|
||||
impl Drop for SafeToDrop {
|
||||
fn drop(&mut self) {
|
||||
assert!(self.0);
|
||||
}
|
||||
}
|
||||
|
||||
let v = AppendOnlyVec::new();
|
||||
|
||||
for _ in 0..50 {
|
||||
v.push(SafeToDrop(false));
|
||||
}
|
||||
|
||||
let mut v = v.into_vec();
|
||||
assert_eq!(v.len(), 50);
|
||||
|
||||
for i in v.iter_mut() {
|
||||
i.0 = true;
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_then_index_mut() {
|
||||
let v = AppendOnlyVec::<usize>::new();
|
||||
for i in 0..1024 {
|
||||
*v.push(i) += 1;
|
||||
}
|
||||
|
||||
let v = v.into_vec();
|
||||
for i in 0..1024 {
|
||||
assert_eq!(v[i], 2 * i);
|
||||
}
|
||||
}
|
@ -12,7 +12,7 @@ use super::super::cache::CboCachedSorter;
|
||||
use super::facet_document::extract_document_facets;
|
||||
use super::FacetKind;
|
||||
use crate::facet::value_encoding::f64_into_bytes;
|
||||
use crate::update::new::append_only_vec::AppendOnlyVec;
|
||||
use crate::update::new::append_only_linked_list::AppendOnlyLinkedList;
|
||||
use crate::update::new::extract::DocidsExtractor;
|
||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
|
||||
use crate::update::new::DocumentChange;
|
||||
@ -210,7 +210,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
|
||||
let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?;
|
||||
let attributes_to_extract: Vec<_> =
|
||||
attributes_to_extract.iter().map(|s| s.as_ref()).collect();
|
||||
let caches = AppendOnlyVec::new();
|
||||
let caches = AppendOnlyLinkedList::new();
|
||||
|
||||
{
|
||||
let span =
|
||||
|
@ -9,7 +9,7 @@ use rayon::iter::IntoParallelIterator;
|
||||
|
||||
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
|
||||
use super::SearchableExtractor;
|
||||
use crate::update::new::append_only_vec::AppendOnlyVec;
|
||||
use crate::update::new::append_only_linked_list::AppendOnlyLinkedList;
|
||||
use crate::update::new::extract::cache::CboCachedSorter;
|
||||
use crate::update::new::extract::perm_json_p::contained_in;
|
||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
|
||||
@ -341,7 +341,7 @@ impl WordDocidsExtractors {
|
||||
max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE,
|
||||
};
|
||||
|
||||
let caches = AppendOnlyVec::new();
|
||||
let caches = AppendOnlyLinkedList::new();
|
||||
|
||||
{
|
||||
let span =
|
||||
|
@ -14,7 +14,7 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer};
|
||||
|
||||
use super::cache::CboCachedSorter;
|
||||
use super::DocidsExtractor;
|
||||
use crate::update::new::append_only_vec::AppendOnlyVec;
|
||||
use crate::update::new::append_only_linked_list::AppendOnlyLinkedList;
|
||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
|
||||
use crate::update::new::DocumentChange;
|
||||
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
|
||||
@ -58,7 +58,7 @@ pub trait SearchableExtractor {
|
||||
localized_attributes_rules: &localized_attributes_rules,
|
||||
max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE,
|
||||
};
|
||||
let caches = AppendOnlyVec::new();
|
||||
let caches = AppendOnlyLinkedList::new();
|
||||
|
||||
{
|
||||
let span =
|
||||
|
@ -4,7 +4,7 @@ pub use top_level_map::{CowStr, TopLevelMap};
|
||||
use super::del_add::DelAdd;
|
||||
use crate::FieldId;
|
||||
|
||||
mod append_only_vec;
|
||||
mod append_only_linked_list;
|
||||
mod channel;
|
||||
mod document_change;
|
||||
mod extract;
|
||||
|
Loading…
x
Reference in New Issue
Block a user