This commit is contained in:
Louis Dureuil 2023-12-07 17:03:10 +01:00
parent dde3a04679
commit cb4ebe163e
No known key found for this signature in database
8 changed files with 185 additions and 157 deletions

View file

@ -42,9 +42,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
facet_id_is_empty_docids,
field_id_docid_facet_f64s,
field_id_docid_facet_strings,
vector_id_docid,
vector_arroy,
docid_vector_ids,
embedder_category_id: _,
documents,
} = self.index;
@ -86,8 +84,6 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
field_id_docid_facet_strings.clear(self.wtxn)?;
// vector
vector_arroy.clear(self.wtxn)?;
vector_id_docid.clear(self.wtxn)?;
docid_vector_ids.clear(self.wtxn)?;
documents.clear(self.wtxn)?;

View file

@ -418,7 +418,7 @@ where
}
// needs to be dropped to avoid channel waiting lock.
drop(lmdb_writer_sx)
drop(lmdb_writer_sx);
});
let index_is_empty = self.index.number_of_documents(self.wtxn)? == 0;
@ -435,6 +435,8 @@ where
let mut word_docids = None;
let mut exact_word_docids = None;
let mut dimension = None;
for result in lmdb_writer_rx {
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
@ -464,6 +466,20 @@ where
word_position_docids = Some(cloneable_chunk);
TypedChunk::WordPositionDocids(chunk)
}
TypedChunk::VectorPoints {
expected_dimension,
remove_vectors,
embeddings,
manual_vectors,
} => {
dimension = Some(expected_dimension);
TypedChunk::VectorPoints {
remove_vectors,
embeddings,
expected_dimension,
manual_vectors,
}
}
otherwise => otherwise,
};
@ -490,9 +506,6 @@ where
}
}
let writer = arroy::Writer::prepare(self.wtxn, self.index.vector_arroy, 0, 0)?;
writer.build(self.wtxn, &mut rand::rngs::StdRng::from_entropy(), None)?;
// We write the field distribution into the main database
self.index.put_field_distribution(self.wtxn, &field_distribution)?;
@ -500,6 +513,23 @@ where
self.index.put_primary_key(self.wtxn, &primary_key)?;
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
if let Some(dimension) = dimension {
let wtxn = &mut *self.wtxn;
let vector_arroy = self.index.vector_arroy;
pool.install(|| {
/// FIXME: do for each embedder
let mut rng = rand::rngs::StdRng::from_entropy();
for k in 0..=u8::MAX {
let writer = arroy::Writer::prepare(wtxn, vector_arroy, k.into(), dimension)?;
if writer.is_empty(wtxn)? {
break;
}
writer.build(wtxn, &mut rng, None)?;
}
Result::Ok(())
})?;
}
self.execute_prefix_databases(
word_docids,
exact_word_docids,

View file

@ -8,9 +8,7 @@ use charabia::{Language, Script};
use grenad::MergerBuilder;
use heed::types::Bytes;
use heed::{PutFlags, RwTxn};
use log::error;
use obkv::{KvReader, KvWriter};
use ordered_float::OrderedFloat;
use roaring::RoaringBitmap;
use super::helpers::{
@ -18,16 +16,12 @@ use super::helpers::{
valid_lmdb_key, CursorClonableMmap,
};
use super::{ClonableMmap, MergeFn};
use crate::distance::NDotProductPoint;
use crate::error::UserError;
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType;
use crate::index::db_name::DOCUMENTS;
use crate::index::Hnsw;
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd};
use crate::update::facet::FacetsUpdate;
use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at};
use crate::update::{available_documents_ids, AvailableDocumentsIds};
use crate::{lat_lng_to_xyz, DocumentId, FieldId, GeoPoint, Index, Result, SerializationError};
pub(crate) enum TypedChunk {
@ -374,28 +368,28 @@ pub(crate) fn write_typed_chunk_into_index(
return Ok((RoaringBitmap::new(), is_merged_database));
}
let mut unavailable_vector_ids = index.unavailable_vector_ids(&wtxn)?;
/// FIXME: allow customizing distance
/// FIXME: allow customizing index
let writer = arroy::Writer::prepare(wtxn, index.vector_arroy, 0, expected_dimension)?;
let writers: std::result::Result<Vec<_>, _> = (0..=u8::MAX)
.map(|k| {
/// FIXME: allow customizing index and then do index << 8 + k
arroy::Writer::prepare(wtxn, index.vector_arroy, k.into(), expected_dimension)
})
.collect();
let writers = writers?;
// remove vectors for docids we want them removed
let mut cursor = remove_vectors.into_cursor()?;
while let Some((key, _)) = cursor.move_on_next()? {
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
let Some(to_remove_vector_ids) = index.docid_vector_ids.get(&wtxn, &docid)? else {
continue;
};
unavailable_vector_ids -= to_remove_vector_ids;
for item in to_remove_vector_ids {
writer.del_item(wtxn, item)?;
for writer in &writers {
// Uses invariant: vectors are packed in the first writers.
if !writer.del_item(wtxn, docid)? {
break;
}
}
}
let mut available_vector_ids =
AvailableDocumentsIds::from_documents_ids(&unavailable_vector_ids);
// add generated embeddings
if let Some(embeddings) = embeddings {
let mut cursor = embeddings.into_cursor()?;
@ -408,19 +402,10 @@ pub(crate) fn write_typed_chunk_into_index(
// code error if we somehow got the wrong dimension
.unwrap();
let mut new_vector_ids = RoaringBitmap::new();
for embedding in embeddings.iter() {
/// FIXME: error when you get over 9000
let next_vector_id = available_vector_ids.next().unwrap();
unavailable_vector_ids.insert(next_vector_id);
new_vector_ids.insert(next_vector_id);
index.vector_id_docid.put(wtxn, &next_vector_id, &docid)?;
writer.add_item(wtxn, next_vector_id, embedding)?;
/// FIXME: detect overflow
for (embedding, writer) in embeddings.iter().zip(&writers) {
writer.add_item(wtxn, docid, embedding)?;
}
index.docid_vector_ids.put(wtxn, &docid, &new_vector_ids)?;
}
}
@ -433,44 +418,52 @@ pub(crate) fn write_typed_chunk_into_index(
let vector_deladd_obkv = KvReaderDelAdd::new(value);
if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) {
let vector = pod_collect_to_vec(value);
let Some(mut docid_vector_ids) = index.docid_vector_ids.get(&wtxn, &docid)?
else {
error!("Unable to delete the vector: {:?}", vector);
continue;
};
for item in docid_vector_ids {
/// FIXME: comparing the vectors by equality is inefficient, and dangerous by perfect equality
let candidate = writer.item_vector(&wtxn, item)?.expect("Inconsistent dbs");
if candidate == vector {
writer.del_item(wtxn, item)?;
unavailable_vector_ids.remove(item);
index.vector_id_docid.delete(wtxn, &item)?;
docid_vector_ids.remove(item);
let vector: Vec<f32> = pod_collect_to_vec(value);
let mut deleted_index = None;
for (index, writer) in writers.iter().enumerate() {
let Some(candidate) = writer.item_vector(&wtxn, docid)? else {
// uses invariant: vectors are packed in the first writers.
break;
};
if candidate == vector {
writer.del_item(wtxn, docid)?;
deleted_index = Some(index);
}
}
// 🥲 enforce invariant: vectors are packed in the first writers.
if let Some(deleted_index) = deleted_index {
let mut last_index_with_a_vector = None;
for (index, writer) in writers.iter().enumerate().skip(deleted_index) {
let Some(candidate) = writer.item_vector(&wtxn, docid)? else {
break;
};
last_index_with_a_vector = Some((index, candidate));
}
if let Some((last_index, vector)) = last_index_with_a_vector {
// unwrap: computed the index from the list of writers
let writer = writers.get(last_index).unwrap();
writer.del_item(wtxn, docid)?;
writers.get(deleted_index).unwrap().add_item(wtxn, docid, &vector)?;
}
}
index.docid_vector_ids.put(wtxn, &docid, &docid_vector_ids)?;
}
let mut available_vector_ids =
AvailableDocumentsIds::from_documents_ids(&unavailable_vector_ids);
if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) {
let vector = pod_collect_to_vec(value);
let next_vector_id = available_vector_ids.next().unwrap();
writer.add_item(wtxn, next_vector_id, &vector)?;
unavailable_vector_ids.insert(next_vector_id);
index.vector_id_docid.put(wtxn, &next_vector_id, &docid)?;
let mut docid_vector_ids =
index.docid_vector_ids.get(&wtxn, &docid)?.unwrap_or_default();
docid_vector_ids.insert(next_vector_id);
index.docid_vector_ids.put(wtxn, &docid, &docid_vector_ids)?;
/// FIXME: detect overflow
for writer in &writers {
if !writer.contains_item(wtxn, docid)? {
writer.add_item(wtxn, docid, &vector)?;
break;
}
}
}
}
log::debug!("There are {} entries in the arroy so far", unavailable_vector_ids.len());
index.put_unavailable_vector_ids(wtxn, unavailable_vector_ids)?;
log::debug!("There are 🤷‍♀️ entries in the arroy so far");
}
TypedChunk::ScriptLanguageDocids(sl_map) => {
for (key, (deletion, addition)) in sl_map {