mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 23:04:26 +01:00
Fix the errors when using the try_map_try_init method
This commit is contained in:
parent
31de5c747e
commit
f3356ddaa4
@ -1,6 +1,7 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::num::NonZero;
|
use std::num::NonZero;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use grenad::{Merger, MergerBuilder};
|
use grenad::{Merger, MergerBuilder};
|
||||||
use heed::RoTxn;
|
use heed::RoTxn;
|
||||||
@ -12,7 +13,7 @@ use crate::update::new::extract::perm_json_p::contained_in;
|
|||||||
use crate::update::new::{DocumentChange, ItemsPool};
|
use crate::update::new::{DocumentChange, ItemsPool};
|
||||||
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
|
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
|
||||||
use crate::{
|
use crate::{
|
||||||
bucketed_position, DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result,
|
bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result,
|
||||||
MAX_POSITION_PER_ATTRIBUTE,
|
MAX_POSITION_PER_ATTRIBUTE,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -303,7 +304,9 @@ impl WordDocidsExtractors {
|
|||||||
index: &Index,
|
index: &Index,
|
||||||
fields_ids_map: &GlobalFieldsIdsMap,
|
fields_ids_map: &GlobalFieldsIdsMap,
|
||||||
indexer: GrenadParameters,
|
indexer: GrenadParameters,
|
||||||
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
|
document_changes: impl IntoParallelIterator<
|
||||||
|
Item = std::result::Result<DocumentChange, Arc<Error>>,
|
||||||
|
>,
|
||||||
) -> Result<WordDocidsMergers> {
|
) -> Result<WordDocidsMergers> {
|
||||||
let max_memory = indexer.max_memory_by_thread();
|
let max_memory = indexer.max_memory_by_thread();
|
||||||
|
|
||||||
|
@ -1,8 +1,11 @@
|
|||||||
use rayon::iter::IndexedParallelIterator;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use rayon::iter::{IndexedParallelIterator, ParallelBridge, ParallelIterator};
|
||||||
|
|
||||||
use super::DocumentChanges;
|
use super::DocumentChanges;
|
||||||
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
|
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
|
||||||
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
|
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
|
||||||
|
use crate::update::new::items_pool::ParallelIteratorExt;
|
||||||
use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId};
|
use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId};
|
||||||
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
|
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
|
||||||
|
|
||||||
@ -37,41 +40,46 @@ where
|
|||||||
> {
|
> {
|
||||||
let (fields_ids_map, concurrent_available_ids, primary_key) = param;
|
let (fields_ids_map, concurrent_available_ids, primary_key) = param;
|
||||||
|
|
||||||
Ok(self.iter.map(|object| {
|
Ok(self.iter.try_map_try_init(
|
||||||
let docid = match concurrent_available_ids.next() {
|
|| Ok(()),
|
||||||
Some(id) => id,
|
|_, object| {
|
||||||
None => return Err(Error::UserError(UserError::DocumentLimitReached)),
|
let docid = match concurrent_available_ids.next() {
|
||||||
};
|
Some(id) => id,
|
||||||
|
None => return Err(Error::UserError(UserError::DocumentLimitReached)),
|
||||||
|
};
|
||||||
|
|
||||||
let mut writer = KvWriterFieldId::memory();
|
let mut writer = KvWriterFieldId::memory();
|
||||||
object.iter().for_each(|(key, value)| {
|
object.iter().for_each(|(key, value)| {
|
||||||
let key = fields_ids_map.id(key).unwrap();
|
let key = fields_ids_map.id(key).unwrap();
|
||||||
/// TODO better error management
|
/// TODO better error management
|
||||||
let value = serde_json::to_vec(&value).unwrap();
|
let value = serde_json::to_vec(&value).unwrap();
|
||||||
/// TODO it is not ordered
|
/// TODO it is not ordered
|
||||||
writer.insert(key, value).unwrap();
|
writer.insert(key, value).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
let document = writer.into_boxed();
|
let document = writer.into_boxed();
|
||||||
let external_docid = match primary_key.document_id(&document, fields_ids_map)? {
|
let external_docid = match primary_key.document_id(&document, fields_ids_map)? {
|
||||||
Ok(document_id) => Ok(document_id),
|
Ok(document_id) => Ok(document_id),
|
||||||
Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => Err(user_error),
|
Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => {
|
||||||
Err(DocumentIdExtractionError::MissingDocumentId) => {
|
Err(user_error)
|
||||||
Err(UserError::MissingDocumentId {
|
}
|
||||||
primary_key: primary_key.name().to_string(),
|
Err(DocumentIdExtractionError::MissingDocumentId) => {
|
||||||
document: all_obkv_to_json(&document, fields_ids_map)?,
|
Err(UserError::MissingDocumentId {
|
||||||
})
|
primary_key: primary_key.name().to_string(),
|
||||||
}
|
document: all_obkv_to_json(&document, fields_ids_map)?,
|
||||||
Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
|
})
|
||||||
Err(UserError::TooManyDocumentIds {
|
}
|
||||||
primary_key: primary_key.name().to_string(),
|
Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
|
||||||
document: all_obkv_to_json(&document, fields_ids_map)?,
|
Err(UserError::TooManyDocumentIds {
|
||||||
})
|
primary_key: primary_key.name().to_string(),
|
||||||
}
|
document: all_obkv_to_json(&document, fields_ids_map)?,
|
||||||
}?;
|
})
|
||||||
|
}
|
||||||
|
}?;
|
||||||
|
|
||||||
let insertion = Insertion::create(docid, document);
|
let insertion = Insertion::create(docid, document);
|
||||||
Ok(DocumentChange::Insertion(insertion))
|
Ok(DocumentChange::Insertion(insertion))
|
||||||
}))
|
},
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,9 @@ use crossbeam_channel::{Receiver, Sender, TryRecvError};
|
|||||||
use rayon::iter::{MapInit, ParallelIterator};
|
use rayon::iter::{MapInit, ParallelIterator};
|
||||||
|
|
||||||
pub trait ParallelIteratorExt: ParallelIterator {
|
pub trait ParallelIteratorExt: ParallelIterator {
|
||||||
/// A method on a parallel iterator to map
|
/// Maps items based on the init function.
|
||||||
|
///
|
||||||
|
/// The init function is ran only as necessary which is basically once by thread.
|
||||||
fn try_map_try_init<F, INIT, T, E, R>(
|
fn try_map_try_init<F, INIT, T, E, R>(
|
||||||
self,
|
self,
|
||||||
init: INIT,
|
init: INIT,
|
||||||
|
Loading…
Reference in New Issue
Block a user