Mutualize tokenization

This commit is contained in:
ManyTheFish 2024-09-11 10:20:23 +02:00
parent 3848adf5a2
commit 39b5990f64
6 changed files with 512 additions and 73 deletions

View File

@ -38,7 +38,7 @@ impl SearchableExtractor for FidWordCountDocidsExtractor {
match document_change {
DocumentChange::Deletion(inner) => {
let mut fid_word_count = HashMap::new();
let mut token_fn = |fid: FieldId, _pos: u16, _word: &str| {
let mut token_fn = |_fname: &str, fid: FieldId, _pos: u16, _word: &str| {
fid_word_count.entry(fid).and_modify(|count| *count += 1).or_insert(1);
Ok(())
};
@ -58,7 +58,7 @@ impl SearchableExtractor for FidWordCountDocidsExtractor {
}
DocumentChange::Update(inner) => {
let mut fid_word_count = HashMap::new();
let mut token_fn = |fid: FieldId, _pos: u16, _word: &str| {
let mut token_fn = |_fname: &str, fid: FieldId, _pos: u16, _word: &str| {
fid_word_count
.entry(fid)
.and_modify(|(current_count, _new_count)| *current_count += 1)
@ -71,7 +71,7 @@ impl SearchableExtractor for FidWordCountDocidsExtractor {
&mut token_fn,
)?;
let mut token_fn = |fid: FieldId, _pos: u16, _word: &str| {
let mut token_fn = |_fname: &str, fid: FieldId, _pos: u16, _word: &str| {
fid_word_count
.entry(fid)
.and_modify(|(_current_count, new_count)| *new_count += 1)
@ -96,7 +96,7 @@ impl SearchableExtractor for FidWordCountDocidsExtractor {
}
DocumentChange::Insertion(inner) => {
let mut fid_word_count = HashMap::new();
let mut token_fn = |fid: FieldId, _pos: u16, _word: &str| {
let mut token_fn = |_fname: &str, fid: FieldId, _pos: u16, _word: &str| {
fid_word_count.entry(fid).and_modify(|count| *count += 1).or_insert(1);
Ok(())
};

View File

@ -1,17 +1,30 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::{borrow::Cow, fs::File, num::NonZero};
use grenad::Merger;
use grenad::MergerBuilder;
use heed::RoTxn;
use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator;
use super::{tokenize_document::DocumentTokenizer, SearchableExtractor};
use super::{
tokenize_document::{tokenizer_builder, DocumentTokenizer},
SearchableExtractor,
};
use crate::update::new::extract::perm_json_p::contained_in;
use crate::DocumentId;
use crate::{
bucketed_position,
update::{
new::{extract::cache::CboCachedSorter, DocumentChange},
MergeDeladdCboRoaringBitmaps,
create_sorter,
new::{extract::cache::CboCachedSorter, DocumentChange, ItemsPool},
GrenadParameters, MergeDeladdCboRoaringBitmaps,
},
FieldId, GlobalFieldsIdsMap, Index, Result,
FieldId, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE,
};
const MAX_COUNTED_WORDS: usize = 30;
trait ProtoWordDocidsExtractor {
fn build_key(field_id: FieldId, position: u16, word: &str) -> Cow<'_, [u8]>;
fn attributes_to_extract<'a>(
@ -36,7 +49,7 @@ where
) -> Result<()> {
match document_change {
DocumentChange::Deletion(inner) => {
let mut token_fn = |fid, pos: u16, word: &str| {
let mut token_fn = |_fname: &str, fid, pos, word: &str| {
let key = Self::build_key(fid, pos, word);
cached_sorter.insert_del_u32(&key, inner.docid()).map_err(crate::Error::from)
};
@ -47,7 +60,7 @@ where
)?;
}
DocumentChange::Update(inner) => {
let mut token_fn = |fid, pos, word: &str| {
let mut token_fn = |_fname: &str, fid, pos, word: &str| {
let key = Self::build_key(fid, pos, word);
cached_sorter.insert_del_u32(&key, inner.docid()).map_err(crate::Error::from)
};
@ -57,14 +70,14 @@ where
&mut token_fn,
)?;
let mut token_fn = |fid, pos, word: &str| {
let mut token_fn = |_fname: &str, fid, pos, word: &str| {
let key = Self::build_key(fid, pos, word);
cached_sorter.insert_add_u32(&key, inner.docid()).map_err(crate::Error::from)
};
document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?;
}
DocumentChange::Insertion(inner) => {
let mut token_fn = |fid, pos, word: &str| {
let mut token_fn = |_fname: &str, fid, pos, word: &str| {
let key = Self::build_key(fid, pos, word);
cached_sorter.insert_add_u32(&key, inner.docid()).map_err(crate::Error::from)
};
@ -181,3 +194,435 @@ impl ProtoWordDocidsExtractor for WordPositionDocidsExtractor {
Cow::Owned(key)
}
}
// V2
struct WordDocidsCachedSorters {
word_fid_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
word_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
exact_word_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
word_position_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
fid_word_count_docids: CboCachedSorter<MergeDeladdCboRoaringBitmaps>,
fid_word_count: HashMap<FieldId, (usize, usize)>,
current_docid: Option<DocumentId>,
}
impl WordDocidsCachedSorters {
pub fn new(
indexer: GrenadParameters,
max_memory: Option<usize>,
capacity: NonZero<usize>,
) -> Self {
let max_memory = max_memory.map(|max_memory| max_memory / 4);
let word_fid_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
let word_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
let exact_word_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
let word_position_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
let fid_word_count_docids = CboCachedSorter::new(
capacity,
create_sorter(
grenad::SortAlgorithm::Stable,
MergeDeladdCboRoaringBitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
),
);
Self {
word_fid_docids,
word_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
fid_word_count: HashMap::new(),
current_docid: None,
}
}
fn insert_add_u32(
&mut self,
field_id: FieldId,
position: u16,
word: &str,
exact: bool,
docid: u32,
buffer: &mut Vec<u8>,
) -> Result<()> {
let key = word.as_bytes();
if exact {
self.exact_word_docids.insert_add_u32(key, docid)?;
} else {
self.word_docids.insert_add_u32(key, docid)?;
}
buffer.clear();
buffer.extend_from_slice(word.as_bytes());
buffer.push(0);
buffer.extend_from_slice(&position.to_be_bytes());
self.word_fid_docids.insert_add_u32(key, docid)?;
buffer.clear();
buffer.extend_from_slice(word.as_bytes());
buffer.push(0);
buffer.extend_from_slice(&field_id.to_be_bytes());
self.word_position_docids.insert_add_u32(buffer, docid)?;
if self.current_docid.map_or(false, |id| docid != id) {
self.flush_fid_word_count(buffer)?;
}
self.fid_word_count
.entry(field_id)
.and_modify(|(_current_count, new_count)| *new_count += 1)
.or_insert((0, 1));
self.current_docid = Some(docid);
Ok(())
}
fn insert_del_u32(
&mut self,
field_id: FieldId,
position: u16,
word: &str,
exact: bool,
docid: u32,
buffer: &mut Vec<u8>,
) -> Result<()> {
let key = word.as_bytes();
if exact {
self.exact_word_docids.insert_del_u32(key, docid)?;
} else {
self.word_docids.insert_del_u32(key, docid)?;
}
buffer.clear();
buffer.extend_from_slice(word.as_bytes());
buffer.push(0);
buffer.extend_from_slice(&position.to_be_bytes());
self.word_fid_docids.insert_del_u32(key, docid)?;
buffer.clear();
buffer.extend_from_slice(word.as_bytes());
buffer.push(0);
buffer.extend_from_slice(&field_id.to_be_bytes());
self.word_position_docids.insert_del_u32(buffer, docid)?;
if self.current_docid.map_or(false, |id| docid != id) {
self.flush_fid_word_count(buffer)?;
}
self.fid_word_count
.entry(field_id)
.and_modify(|(current_count, _new_count)| *current_count += 1)
.or_insert((1, 0));
self.current_docid = Some(docid);
Ok(())
}
fn flush_fid_word_count(&mut self, buffer: &mut Vec<u8>) -> Result<()> {
for (fid, (current_count, new_count)) in self.fid_word_count.drain() {
if current_count != new_count {
if current_count <= MAX_COUNTED_WORDS {
buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(current_count as u8);
self.fid_word_count_docids
.insert_del_u32(buffer, self.current_docid.unwrap())?;
}
if new_count <= MAX_COUNTED_WORDS {
buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(new_count as u8);
self.fid_word_count_docids
.insert_add_u32(buffer, self.current_docid.unwrap())?;
}
}
}
Ok(())
}
}
struct WordDocidsMergerBuilders {
word_fid_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
word_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
exact_word_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
word_position_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
fid_word_count_docids: MergerBuilder<File, MergeDeladdCboRoaringBitmaps>,
}
pub struct WordDocidsMergers {
pub word_fid_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub word_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub exact_word_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub word_position_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
pub fid_word_count_docids: Merger<File, MergeDeladdCboRoaringBitmaps>,
}
impl WordDocidsMergerBuilders {
fn new() -> Self {
Self {
word_fid_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
exact_word_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
word_position_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
fid_word_count_docids: MergerBuilder::new(MergeDeladdCboRoaringBitmaps),
}
}
fn add_sorters(&mut self, other: WordDocidsCachedSorters) -> Result<()> {
let WordDocidsCachedSorters {
word_fid_docids,
word_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
fid_word_count: _,
current_docid: _,
} = other;
let sorter = word_fid_docids.into_sorter()?;
let readers = sorter.into_reader_cursors()?;
self.word_fid_docids.extend(readers);
let sorter = word_docids.into_sorter()?;
let readers = sorter.into_reader_cursors()?;
self.word_docids.extend(readers);
let sorter = exact_word_docids.into_sorter()?;
let readers = sorter.into_reader_cursors()?;
self.exact_word_docids.extend(readers);
let sorter = word_position_docids.into_sorter()?;
let readers = sorter.into_reader_cursors()?;
self.word_position_docids.extend(readers);
let sorter = fid_word_count_docids.into_sorter()?;
let readers = sorter.into_reader_cursors()?;
self.fid_word_count_docids.extend(readers);
Ok(())
}
fn build(self) -> WordDocidsMergers {
WordDocidsMergers {
word_fid_docids: self.word_fid_docids.build(),
word_docids: self.word_docids.build(),
exact_word_docids: self.exact_word_docids.build(),
word_position_docids: self.word_position_docids.build(),
fid_word_count_docids: self.fid_word_count_docids.build(),
}
}
}
pub struct WordDocidsExtractors;
impl WordDocidsExtractors {
pub fn run_extraction(
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
) -> Result<WordDocidsMergers> {
let max_memory = indexer.max_memory_by_thread();
let rtxn = index.read_txn()?;
let stop_words = index.stop_words(&rtxn)?;
let allowed_separators = index.allowed_separators(&rtxn)?;
let allowed_separators: Option<Vec<_>> =
allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect());
let dictionary = index.dictionary(&rtxn)?;
let dictionary: Option<Vec<_>> =
dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect());
let builder = tokenizer_builder(
stop_words.as_ref(),
allowed_separators.as_deref(),
dictionary.as_deref(),
);
let tokenizer = builder.into_tokenizer();
let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?;
let attributes_to_skip = Self::attributes_to_skip(&rtxn, index)?;
let localized_attributes_rules =
index.localized_attributes_rules(&rtxn)?.unwrap_or_default();
let document_tokenizer = DocumentTokenizer {
tokenizer: &tokenizer,
attribute_to_extract: attributes_to_extract.as_deref(),
attribute_to_skip: attributes_to_skip.as_slice(),
localized_attributes_rules: &localized_attributes_rules,
max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE,
};
let context_pool = ItemsPool::new(|| {
Ok((
index.read_txn()?,
&document_tokenizer,
fields_ids_map.clone(),
WordDocidsCachedSorters::new(
indexer,
max_memory,
// TODO use a better value
200_000.try_into().unwrap(),
),
))
});
document_changes.into_par_iter().try_for_each(|document_change| {
context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| {
Self::extract_document_change(
&*rtxn,
index,
document_tokenizer,
fields_ids_map,
cached_sorter,
document_change?,
)
})
})?;
let mut builder = WordDocidsMergerBuilders::new();
for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() {
builder.add_sorters(cache)?;
}
Ok(builder.build())
}
fn extract_document_change(
rtxn: &RoTxn,
index: &Index,
document_tokenizer: &DocumentTokenizer,
fields_ids_map: &mut GlobalFieldsIdsMap,
cached_sorter: &mut WordDocidsCachedSorters,
document_change: DocumentChange,
) -> Result<()> {
let exact_attributes = index.exact_attributes(&rtxn)?;
let is_exact_attribute =
|fname: &str| exact_attributes.iter().any(|attr| contained_in(fname, attr));
let mut buffer = Vec::new();
match document_change {
DocumentChange::Deletion(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_del_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
)
.map_err(crate::Error::from)
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index)?.unwrap(),
fields_ids_map,
&mut token_fn,
)?;
}
DocumentChange::Update(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_del_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
)
.map_err(crate::Error::from)
};
document_tokenizer.tokenize_document(
inner.current(rtxn, index)?.unwrap(),
fields_ids_map,
&mut token_fn,
)?;
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_add_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
)
.map_err(crate::Error::from)
};
document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?;
}
DocumentChange::Insertion(inner) => {
let mut token_fn = |fname: &str, fid, pos, word: &str| {
cached_sorter
.insert_add_u32(
fid,
pos,
word,
is_exact_attribute(fname),
inner.docid(),
&mut buffer,
)
.map_err(crate::Error::from)
};
document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?;
}
}
cached_sorter.flush_fid_word_count(&mut buffer)
}
fn attributes_to_extract<'a>(
rtxn: &'a RoTxn,
index: &'a Index,
) -> Result<Option<Vec<&'a str>>> {
index.user_defined_searchable_fields(rtxn).map_err(Into::into)
}
fn attributes_to_skip<'a>(_rtxn: &'a RoTxn, _index: &'a Index) -> Result<Vec<&'a str>> {
Ok(vec![])
}
}

View File

@ -149,7 +149,7 @@ fn process_document_tokens(
word_positions: &mut VecDeque<(String, u16)>,
word_pair_proximity: &mut BTreeMap<(String, String), u8>,
) -> Result<()> {
let mut token_fn = |_fid: FieldId, pos: u16, word: &str| {
let mut token_fn = |_fname: &str, _fid: FieldId, pos: u16, word: &str| {
// drain the proximity window until the head word is considered close to the word we are inserting.
while word_positions
.front()

View File

@ -7,8 +7,8 @@ use std::fs::File;
pub use extract_fid_word_count_docids::FidWordCountDocidsExtractor;
pub use extract_word_docids::{
ExactWordDocidsExtractor, WordDocidsExtractor, WordFidDocidsExtractor,
WordPositionDocidsExtractor,
ExactWordDocidsExtractor, WordDocidsExtractor, WordDocidsExtractors, WordDocidsMergers,
WordFidDocidsExtractor, WordPositionDocidsExtractor,
};
pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
use grenad::Merger;

View File

@ -26,7 +26,7 @@ impl<'a> DocumentTokenizer<'a> {
&self,
obkv: &KvReaderFieldId,
field_id_map: &mut GlobalFieldsIdsMap,
token_fn: &mut impl FnMut(FieldId, u16, &str) -> Result<()>,
token_fn: &mut impl FnMut(&str, FieldId, u16, &str) -> Result<()>,
) -> Result<()> {
let mut field_position = HashMap::new();
let mut field_name = String::new();
@ -56,7 +56,7 @@ impl<'a> DocumentTokenizer<'a> {
Value::Number(n) => {
let token = n.to_string();
if let Ok(position) = (*position).try_into() {
token_fn(field_id, position, token.as_str())?;
token_fn(name, field_id, position, token.as_str())?;
}
Ok(())
@ -80,7 +80,7 @@ impl<'a> DocumentTokenizer<'a> {
if !token.is_empty() && token.len() <= MAX_WORD_LENGTH {
*position = index;
if let Ok(position) = (*position).try_into() {
token_fn(field_id, position, token)?;
token_fn(name, field_id, position, token)?;
}
}
}
@ -235,7 +235,7 @@ mod test {
let mut words = std::collections::BTreeMap::new();
document_tokenizer
.tokenize_document(obkv, &mut global_fields_ids_map, &mut |fid, pos, word| {
.tokenize_document(obkv, &mut global_fields_ids_map, &mut |_fname, fid, pos, word| {
words.insert([fid, pos], word.to_string());
Ok(())
})

View File

@ -58,7 +58,7 @@ where
{
let (merger_sender, writer_receiver) = merger_writer_channel(10_000);
// This channel acts as a rendezvous point to ensure that we are one task ahead
let (extractor_sender, merger_receiver) = extractors_merger_channels(0);
let (extractor_sender, merger_receiver) = extractors_merger_channels(4);
let fields_ids_map_lock = RwLock::new(fields_ids_map);
let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock);
@ -103,62 +103,56 @@ where
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
extract_and_send_docids::<WordDocidsExtractor, WordDocids>(
index,
&global_fields_ids_map,
grenad_parameters,
document_changes.clone(),
&extractor_sender,
)?;
let WordDocidsMergers {
word_fid_docids,
word_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(index, &global_fields_ids_map, grenad_parameters, document_changes.clone())?;
extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap();
extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap();
extractor_sender.send_searchable::<WordPositionDocids>(word_position_docids).unwrap();
extractor_sender.send_searchable::<FidWordCountDocids>(fid_word_count_docids).unwrap();
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_fid_docids");
let _entered = span.enter();
extract_and_send_docids::<WordFidDocidsExtractor, WordFidDocids>(
index,
&global_fields_ids_map,
grenad_parameters,
document_changes.clone(),
&extractor_sender,
)?;
}
// {
// let span = tracing::trace_span!(target: "indexing::documents::extract", "exact_word_docids");
// let _entered = span.enter();
// extract_and_send_docids::<ExactWordDocidsExtractor, ExactWordDocids>(
// index,
// &global_fields_ids_map,
// grenad_parameters,
// document_changes.clone(),
// &extractor_sender,
// )?;
// }
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "exact_word_docids");
let _entered = span.enter();
extract_and_send_docids::<ExactWordDocidsExtractor, ExactWordDocids>(
index,
&global_fields_ids_map,
grenad_parameters,
document_changes.clone(),
&extractor_sender,
)?;
}
// {
// let span = tracing::trace_span!(target: "indexing::documents::extract", "word_position_docids");
// let _entered = span.enter();
// extract_and_send_docids::<WordPositionDocidsExtractor, WordPositionDocids>(
// index,
// &global_fields_ids_map,
// grenad_parameters,
// document_changes.clone(),
// &extractor_sender,
// )?;
// }
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_position_docids");
let _entered = span.enter();
extract_and_send_docids::<WordPositionDocidsExtractor, WordPositionDocids>(
index,
&global_fields_ids_map,
grenad_parameters,
document_changes.clone(),
&extractor_sender,
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "fid_word_count_docids");
let _entered = span.enter();
extract_and_send_docids::<FidWordCountDocidsExtractor, FidWordCountDocids>(
index,
&global_fields_ids_map,
GrenadParameters::default(),
document_changes.clone(),
&extractor_sender,
)?;
}
// {
// let span = tracing::trace_span!(target: "indexing::documents::extract", "fid_word_count_docids");
// let _entered = span.enter();
// extract_and_send_docids::<FidWordCountDocidsExtractor, FidWordCountDocids>(
// index,
// &global_fields_ids_map,
// GrenadParameters::default(),
// document_changes.clone(),
// &extractor_sender,
// )?;
// }
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");