Merge pull request #34 from meilisearch/speedup-indexing

Write the words pairs proximities directly into LMDB to speedup indexing
This commit is contained in:
Clément Renault 2020-11-11 11:30:28 +01:00 committed by GitHub
commit 8a4794fc51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -325,7 +325,6 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
enum DatabaseType { enum DatabaseType {
Main, Main,
WordDocids, WordDocids,
WordsPairsProximitiesDocids,
} }
let searchable_fields: HashSet<_> = match self.index.searchable_fields(self.wtxn)? { let searchable_fields: HashSet<_> = match self.index.searchable_fields(self.wtxn)? {
@ -352,7 +351,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
}, },
}; };
let (receiver, docid_word_positions_readers, documents_readers) = pool.install(|| { let readers = pool.install(|| {
let num_threads = rayon::current_num_threads(); let num_threads = rayon::current_num_threads();
let max_memory_by_job = max_memory.map(|mm| mm / num_threads); let max_memory_by_job = max_memory.map(|mm| mm / num_threads);
@ -405,18 +404,13 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
// The enum and the channel which is used to transfert // The enum and the channel which is used to transfert
// the readers merges potentially done on another thread. // the readers merges potentially done on another thread.
let (sender, receiver) = sync_channel(3); let (sender, receiver) = sync_channel(2);
debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); debug!("Merging the main, word docids and words pairs proximity docids in parallel...");
rayon::spawn(move || { rayon::spawn(move || {
vec![ vec![
(DatabaseType::Main, main_readers, main_merge as MergeFn), (DatabaseType::Main, main_readers, main_merge as MergeFn),
(DatabaseType::WordDocids, word_docids_readers, word_docids_merge), (DatabaseType::WordDocids, word_docids_readers, word_docids_merge),
(
DatabaseType::WordsPairsProximitiesDocids,
words_pairs_proximities_docids_readers,
words_pairs_proximities_docids_merge,
),
] ]
.into_par_iter() .into_par_iter()
.for_each(|(dbtype, readers, merge)| { .for_each(|(dbtype, readers, merge)| {
@ -427,9 +421,21 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
}); });
}); });
Ok((receiver, docid_word_positions_readers, documents_readers)) as anyhow::Result<_> Ok((
receiver,
docid_word_positions_readers,
documents_readers,
words_pairs_proximities_docids_readers,
)) as anyhow::Result<_>
})?; })?;
let (
receiver,
docid_word_positions_readers,
documents_readers,
words_pairs_proximities_docids_readers,
) = readers;
let mut documents_ids = self.index.documents_ids(self.wtxn)?; let mut documents_ids = self.index.documents_ids(self.wtxn)?;
let contains_documents = !documents_ids.is_empty(); let contains_documents = !documents_ids.is_empty();
let write_method = if contains_documents { let write_method = if contains_documents {
@ -472,6 +478,15 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
write_method write_method
)?; )?;
debug!("Writing the words pairs proximities docids into LMDB on disk...");
merge_into_lmdb_database(
self.wtxn,
*self.index.word_pair_proximity_docids.as_polymorph(),
words_pairs_proximities_docids_readers,
words_pairs_proximities_docids_merge,
write_method,
)?;
for (db_type, result) in receiver { for (db_type, result) in receiver {
let content = result?; let content = result?;
match db_type { match db_type {
@ -496,17 +511,6 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
write_method, write_method,
)?; )?;
}, },
DatabaseType::WordsPairsProximitiesDocids => {
debug!("Writing the words pairs proximities docids into LMDB on disk...");
let db = *self.index.word_pair_proximity_docids.as_polymorph();
write_into_lmdb_database(
self.wtxn,
db,
content,
words_pairs_proximities_docids_merge,
write_method,
)?;
},
} }
} }