reimplement the batching of task with or without primary key in the autobatcher

This commit is contained in:
Tamo 2023-01-23 20:16:16 +01:00
parent 5672118bfa
commit 767cb725a5
No known key found for this signature in database
GPG key ID: 20CD8020AFA88D69
8 changed files with 206 additions and 234 deletions

View file

@ -207,7 +207,7 @@ impl IndexScheduler {
must_create_index,
})),
BatchKind::DocumentImport { method, import_ids, .. } => {
let mut tasks = self.get_existing_tasks(rtxn, import_ids)?;
let tasks = self.get_existing_tasks(rtxn, import_ids)?;
let primary_key = match &tasks[0].kind {
KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => {
primary_key.clone()
@ -217,18 +217,9 @@ impl IndexScheduler {
let mut documents_counts = Vec::new();
let mut content_files = Vec::new();
let mut drain_after = tasks.len();
for (i, task) in tasks.iter().enumerate() {
for task in tasks.iter() {
match task.kind {
KindWithContent::DocumentAdditionOrUpdate {
primary_key: ref pk, ..
} if pk != &primary_key => {
// we can't autobatch document additions that don't share the same
// primary key because that would make the whole batch fails.
drain_after = i;
break;
}
KindWithContent::DocumentAdditionOrUpdate {
content_file,
documents_count,
@ -241,8 +232,6 @@ impl IndexScheduler {
}
}
tasks.drain(drain_after..);
Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentImport {
index_uid,
@ -337,6 +326,7 @@ impl IndexScheduler {
settings_ids,
method,
allow_index_creation,
primary_key,
import_ids,
} => {
let settings = self.create_next_batch_index(
@ -349,7 +339,12 @@ impl IndexScheduler {
let document_import = self.create_next_batch_index(
rtxn,
index_uid.clone(),
BatchKind::DocumentImport { method, allow_index_creation, import_ids },
BatchKind::DocumentImport {
method,
allow_index_creation,
primary_key,
import_ids,
},
must_create_index,
)?;
@ -479,6 +474,12 @@ impl IndexScheduler {
};
let index_already_exists = self.index_mapper.exists(rtxn, index_name)?;
let mut primary_key = None;
if index_already_exists {
let index = self.index_mapper.index(rtxn, index_name)?;
let rtxn = index.read_txn()?;
primary_key = index.primary_key(&rtxn)?.map(|pk| pk.to_string());
}
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
@ -496,7 +497,7 @@ impl IndexScheduler {
.collect::<Result<Vec<_>>>()?;
if let Some((batchkind, create_index)) =
autobatcher::autobatch(enqueued, index_already_exists)
autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref())
{
return self.create_next_batch_index(
rtxn,