5262: Bring back changes from v1.12.4, v1.12.5, and v1.12.6 into main r=dureuill a=Kerollmops

This PR follows [this guideline to bring back changes after we worked on v1.12.4, v1.12.5, and v1.12.6](https://github.com/meilisearch/engine-team/blob/main/resources/meilisearch-release.md#after-the-release-bring-back-changes-to-main).

Co-authored-by: Louis Dureuil <louis@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Kerollmops <Kerollmops@users.noreply.github.com>
This commit is contained in:
meili-bors[bot] 2025-01-22 14:55:02 +00:00 committed by GitHub
commit 6723700fb9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 249 additions and 87 deletions

4
Cargo.lock generated
View File

@ -6231,9 +6231,9 @@ dependencies = [
[[package]] [[package]]
name = "utoipa-scalar" name = "utoipa-scalar"
version = "0.2.1" version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "088e93bf19f6bd06e0aacb02ca432b3c5a449c4aec2e4aa9fc333a667f2b2c55" checksum = "59559e1509172f6b26c1cdbc7247c4ddd1ac6560fe94b584f81ee489b141f719"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"serde", "serde",

View File

@ -1,8 +1,9 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io;
use dump::{KindDump, TaskDump, UpdateFile}; use dump::{KindDump, TaskDump, UpdateFile};
use meilisearch_types::heed::RwTxn; use meilisearch_types::heed::RwTxn;
use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use uuid::Uuid; use uuid::Uuid;
@ -39,14 +40,19 @@ impl<'a> Dump<'a> {
task: TaskDump, task: TaskDump,
content_file: Option<Box<UpdateFile>>, content_file: Option<Box<UpdateFile>>,
) -> Result<Task> { ) -> Result<Task> {
let task_has_no_docs = matches!(task.kind, KindDump::DocumentImport { documents_count, .. } if documents_count == 0);
let content_uuid = match content_file { let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => { Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.index_scheduler.queue.create_update_file(false)?; let (uuid, file) = self.index_scheduler.queue.create_update_file(false)?;
let mut builder = DocumentsBatchBuilder::new(&mut file); let mut writer = io::BufWriter::new(file);
for doc in content_file { for doc in content_file {
builder.append_json_object(&doc?)?; let doc = doc?;
serde_json::to_writer(&mut writer, &doc).map_err(|e| {
Error::from_milli(milli::InternalError::SerdeJson(e).into(), None)
})?;
} }
builder.into_inner()?; let file = writer.into_inner().map_err(|e| e.into_error())?;
file.persist()?; file.persist()?;
Some(uuid) Some(uuid)
@ -54,6 +60,12 @@ impl<'a> Dump<'a> {
// If the task isn't `Enqueued` then just generate a recognisable `Uuid` // If the task isn't `Enqueued` then just generate a recognisable `Uuid`
// in case we try to open it later. // in case we try to open it later.
_ if task.status != Status::Enqueued => Some(Uuid::nil()), _ if task.status != Status::Enqueued => Some(Uuid::nil()),
None if task.status == Status::Enqueued && task_has_no_docs => {
let (uuid, file) = self.index_scheduler.queue.create_update_file(false)?;
file.persist()?;
Some(uuid)
}
_ => None, _ => None,
}; };

View File

@ -4,7 +4,6 @@ use std::sync::atomic::Ordering;
use dump::IndexMetadata; use dump::IndexMetadata;
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME; use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use meilisearch_types::milli::{self}; use meilisearch_types::milli::{self};
@ -72,6 +71,13 @@ impl IndexScheduler {
t.started_at = Some(started_at); t.started_at = Some(started_at);
t.finished_at = Some(finished_at); t.finished_at = Some(finished_at);
} }
// Patch the task to remove the batch uid, because as of v1.12.5 batches are not persisted.
// This prevent from referencing *future* batches not actually associated with the task.
//
// See <https://github.com/meilisearch/meilisearch/issues/5247> for details.
t.batch_uid = None;
let mut dump_content_file = dump_tasks.push_task(&t.into())?; let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
@ -82,19 +88,15 @@ impl IndexScheduler {
if status == Status::Enqueued { if status == Status::Enqueued {
let content_file = self.queue.file_store.get_update(content_file)?; let content_file = self.queue.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file) for document in
.map_err(|e| Error::from_milli(e.into(), None))?; serde_json::de::Deserializer::from_reader(content_file).into_iter()
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
while let Some(doc) =
cursor.next_document().map_err(|e| Error::from_milli(e.into(), None))?
{ {
dump_content_file.push_document( let document = document.map_err(|e| {
&obkv_to_object(doc, &documents_batch_index) Error::from_milli(milli::InternalError::SerdeJson(e).into(), None)
.map_err(|e| Error::from_milli(e, None))?, })?;
)?; dump_content_file.push_document(&document)?;
} }
dump_content_file.flush()?; dump_content_file.flush()?;
} }
} }

View File

@ -105,8 +105,16 @@ tracing-actix-web = "0.7.15"
build-info = { version = "1.7.0", path = "../build-info" } build-info = { version = "1.7.0", path = "../build-info" }
roaring = "0.10.10" roaring = "0.10.10"
mopa-maintained = "0.2.3" mopa-maintained = "0.2.3"
utoipa = { version = "5.3.1", features = ["actix_extras", "macros", "non_strict_integers", "preserve_order", "uuid", "time", "openapi_extensions"] } utoipa = { version = "5.3.1", features = [
utoipa-scalar = { version = "0.2.1", optional = true, features = ["actix-web"] } "actix_extras",
"macros",
"non_strict_integers",
"preserve_order",
"uuid",
"time",
"openapi_extensions",
] }
utoipa-scalar = { version = "0.3.0", optional = true, features = ["actix-web"] }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.10.0" actix-rt = "2.10.0"

View File

@ -1524,3 +1524,57 @@ async fn change_attributes_settings() {
) )
.await; .await;
} }
/// Modifying facets with different casing should work correctly
#[actix_rt::test]
async fn change_facet_casing() {
let server = Server::new().await;
let index = server.index("test");
let (response, code) = index
.update_settings(json!({
"filterableAttributes": ["dog"],
}))
.await;
assert_eq!("202", code.as_str(), "{:?}", response);
index.wait_task(response.uid()).await;
let (response, _code) = index
.add_documents(
json!([
{
"id": 1,
"dog": "Bouvier Bernois"
}
]),
None,
)
.await;
index.wait_task(response.uid()).await;
let (response, _code) = index
.add_documents(
json!([
{
"id": 1,
"dog": "bouvier bernois"
}
]),
None,
)
.await;
index.wait_task(response.uid()).await;
index
.search(json!({ "facets": ["dog"] }), |response, code| {
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response["facetDistribution"]), @r###"
{
"dog": {
"bouvier bernois": 1
}
}
"###);
})
.await;
}

View File

@ -88,7 +88,7 @@ fn main() -> anyhow::Result<()> {
match command { match command {
Command::ClearTaskQueue => clear_task_queue(db_path), Command::ClearTaskQueue => clear_task_queue(db_path),
Command::ExportADump { dump_dir, skip_enqueued_tasks } => { Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
export_a_dump(db_path, dump_dir, skip_enqueued_tasks) export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
} }
Command::OfflineUpgrade { target_version } => { Command::OfflineUpgrade { target_version } => {
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
@ -187,6 +187,7 @@ fn export_a_dump(
db_path: PathBuf, db_path: PathBuf,
dump_dir: PathBuf, dump_dir: PathBuf,
skip_enqueued_tasks: bool, skip_enqueued_tasks: bool,
detected_version: (String, String, String),
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let started_at = OffsetDateTime::now_utc(); let started_at = OffsetDateTime::now_utc();
@ -238,15 +239,13 @@ fn export_a_dump(
if skip_enqueued_tasks { if skip_enqueued_tasks {
eprintln!("Skip dumping the enqueued tasks..."); eprintln!("Skip dumping the enqueued tasks...");
} else { } else {
eprintln!("Dumping the enqueued tasks...");
// 3. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?; let mut dump_tasks = dump.create_tasks_queue()?;
let mut count = 0; let mut count = 0;
for ret in all_tasks.iter(&rtxn)? { for ret in all_tasks.iter(&rtxn)? {
let (_, t) = ret?; let (_, t) = ret?;
let status = t.status; let status = t.status;
let content_file = t.content_uuid(); let content_file = t.content_uuid();
let mut dump_content_file = dump_tasks.push_task(&t.into())?; let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. // 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
@ -254,18 +253,39 @@ fn export_a_dump(
if status == Status::Enqueued { if status == Status::Enqueued {
let content_file = file_store.get_update(content_file_uuid)?; let content_file = file_store.get_update(content_file_uuid)?;
let reader = if (
DocumentsBatchReader::from_reader(content_file).with_context(|| { detected_version.0.as_str(),
format!("While reading content file {:?}", content_file_uuid) detected_version.1.as_str(),
})?; detected_version.2.as_str(),
) < ("1", "12", "0")
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); {
while let Some(doc) = cursor.next_document().with_context(|| { eprintln!("Dumping the enqueued tasks reading them in obkv format...");
format!("While iterating on content file {:?}", content_file_uuid) let reader =
})? { DocumentsBatchReader::from_reader(content_file).with_context(|| {
dump_content_file format!("While reading content file {:?}", content_file_uuid)
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?; })?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().with_context(|| {
format!("While iterating on content file {:?}", content_file_uuid)
})? {
dump_content_file
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
}
} else {
eprintln!(
"Dumping the enqueued tasks reading them in JSON stream format..."
);
for document in
serde_json::de::Deserializer::from_reader(content_file).into_iter()
{
let document = document.with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
dump_content_file.push_document(&document)?;
}
} }
dump_content_file.flush()?; dump_content_file.flush()?;
count += 1; count += 1;
} }

View File

@ -20,6 +20,34 @@ pub struct OfflineUpgrade {
impl OfflineUpgrade { impl OfflineUpgrade {
pub fn upgrade(self) -> anyhow::Result<()> { pub fn upgrade(self) -> anyhow::Result<()> {
// Adding a version?
//
// 1. Update the LAST_SUPPORTED_UPGRADE_FROM_VERSION and LAST_SUPPORTED_UPGRADE_TO_VERSION.
// 2. Add new version to the upgrade list if necessary
// 3. Use `no_upgrade` as index for versions that are compatible.
if self.current_version == self.target_version {
println!("Database is already at the target version. Exiting.");
return Ok(());
}
if self.current_version > self.target_version {
bail!(
"Cannot downgrade from {}.{}.{} to {}.{}.{}. Downgrade not supported",
self.current_version.0,
self.current_version.1,
self.current_version.2,
self.target_version.0,
self.target_version.1,
self.target_version.2
);
}
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5";
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5";
let upgrade_list = [ let upgrade_list = [
( (
v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>, v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>,
@ -32,6 +60,8 @@ impl OfflineUpgrade {
(v1_12_to_v1_12_3, "1", "12", "3"), (v1_12_to_v1_12_3, "1", "12", "3"),
]; ];
let no_upgrade: usize = upgrade_list.len();
let (current_major, current_minor, current_patch) = &self.current_version; let (current_major, current_minor, current_patch) = &self.current_version;
let start_at = match ( let start_at = match (
@ -42,9 +72,12 @@ impl OfflineUpgrade {
("1", "9", _) => 0, ("1", "9", _) => 0,
("1", "10", _) => 1, ("1", "10", _) => 1,
("1", "11", _) => 2, ("1", "11", _) => 2,
("1", "12", x) if x == "0" || x == "1" || x == "2" => 3, ("1", "12", "0" | "1" | "2") => 3,
("1", "12", "3" | "4" | "5") => no_upgrade,
_ => { _ => {
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10") bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
LAST_SUPPORTED_UPGRADE_FROM_VERSION);
} }
}; };
@ -53,18 +86,28 @@ impl OfflineUpgrade {
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
("1", "10", _) => 0, ("1", "10", _) => 0,
("1", "11", _) => 1, ("1", "11", _) => 1,
("1", "12", x) if x == "0" || x == "1" || x == "2" => 2, ("1", "12", "0" | "1" | "2") => 2,
("1", "12", "3") => 3, ("1", "12", "3" | "4" | "5") => 3,
(major, _, _) if major.starts_with('v') => { (major, _, _) if major.starts_with('v') => {
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.") bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
} }
_ => { _ => {
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10 and v1.11") bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]",
FIRST_SUPPORTED_UPGRADE_TO_VERSION,
LAST_SUPPORTED_UPGRADE_TO_VERSION);
} }
}; };
println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}");
if start_at == no_upgrade {
println!("No upgrade operation to perform, writing VERSION file");
create_version_file(&self.db_path, target_major, target_minor, target_patch)
.context("while writing VERSION file after the upgrade")?;
println!("Success");
return Ok(());
}
#[allow(clippy::needless_range_loop)] #[allow(clippy::needless_range_loop)]
for index in start_at..=ends_at { for index in start_at..=ends_at {
let (func, major, minor, patch) = upgrade_list[index]; let (func, major, minor, patch) = upgrade_list[index];

View File

@ -219,12 +219,19 @@ impl<'a> FacetDistribution<'a> {
let facet_key = StrRefCodec::bytes_decode(facet_key).unwrap(); let facet_key = StrRefCodec::bytes_decode(facet_key).unwrap();
let key: (FieldId, _, &str) = (field_id, any_docid, facet_key); let key: (FieldId, _, &str) = (field_id, any_docid, facet_key);
let original_string = self let optional_original_string =
.index self.index.field_id_docid_facet_strings.get(self.rtxn, &key)?;
.field_id_docid_facet_strings
.get(self.rtxn, &key)? let original_string = match optional_original_string {
.unwrap() Some(original_string) => original_string.to_owned(),
.to_owned(); None => {
tracing::error!(
"Missing original facet string. Using the normalized facet {} instead",
facet_key
);
facet_key.to_string()
}
};
distribution.insert(original_string, nbr_docids); distribution.insert(original_string, nbr_docids);
if distribution.len() == self.max_values_per_facet { if distribution.len() == self.max_values_per_facet {

View File

@ -283,42 +283,60 @@ impl FacetedDocidsExtractor {
} }
struct DelAddFacetValue<'doc> { struct DelAddFacetValue<'doc> {
strings: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>, strings: HashMap<
(FieldId, &'doc str),
Option<BVec<'doc, u8>>,
hashbrown::DefaultHashBuilder,
&'doc Bump,
>,
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>, f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
doc_alloc: &'doc Bump,
} }
impl<'doc> DelAddFacetValue<'doc> { impl<'doc> DelAddFacetValue<'doc> {
fn new(doc_alloc: &'doc Bump) -> Self { fn new(doc_alloc: &'doc Bump) -> Self {
Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc) } Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc), doc_alloc }
} }
fn insert_add(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) { fn insert_add(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
let cache = match kind { match kind {
FacetKind::String => &mut self.strings, FacetKind::Number => {
FacetKind::Number => &mut self.f64s, let key = (fid, value);
_ => return, if let Some(DelAdd::Deletion) = self.f64s.get(&key) {
}; self.f64s.remove(&key);
} else {
let key = (fid, value); self.f64s.insert(key, DelAdd::Addition);
if let Some(DelAdd::Deletion) = cache.get(&key) { }
cache.remove(&key); }
} else { FacetKind::String => {
cache.insert(key, DelAdd::Addition); if let Ok(s) = std::str::from_utf8(&value) {
let normalized = crate::normalize_facet(s);
let truncated = self.doc_alloc.alloc_str(truncate_str(&normalized));
self.strings.insert((fid, truncated), Some(value));
}
}
_ => (),
} }
} }
fn insert_del(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) { fn insert_del(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
let cache = match kind { match kind {
FacetKind::String => &mut self.strings, FacetKind::Number => {
FacetKind::Number => &mut self.f64s, let key = (fid, value);
_ => return, if let Some(DelAdd::Addition) = self.f64s.get(&key) {
}; self.f64s.remove(&key);
} else {
let key = (fid, value); self.f64s.insert(key, DelAdd::Deletion);
if let Some(DelAdd::Addition) = cache.get(&key) { }
cache.remove(&key); }
} else { FacetKind::String => {
cache.insert(key, DelAdd::Deletion); if let Ok(s) = std::str::from_utf8(&value) {
let normalized = crate::normalize_facet(s);
let truncated = self.doc_alloc.alloc_str(truncate_str(&normalized));
self.strings.insert((fid, truncated), None);
}
}
_ => (),
} }
} }
@ -329,18 +347,14 @@ impl<'doc> DelAddFacetValue<'doc> {
doc_alloc: &Bump, doc_alloc: &Bump,
) -> crate::Result<()> { ) -> crate::Result<()> {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc); let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
for ((fid, value), deladd) in self.strings { for ((fid, truncated), value) in self.strings {
if let Ok(s) = std::str::from_utf8(&value) { buffer.clear();
buffer.clear(); buffer.extend_from_slice(&fid.to_be_bytes());
buffer.extend_from_slice(&fid.to_be_bytes()); buffer.extend_from_slice(&docid.to_be_bytes());
buffer.extend_from_slice(&docid.to_be_bytes()); buffer.extend_from_slice(truncated.as_bytes());
let normalized = crate::normalize_facet(s); match &value {
let truncated = truncate_str(&normalized); Some(value) => sender.write_facet_string(&buffer, value)?,
buffer.extend_from_slice(truncated.as_bytes()); None => sender.delete_facet_string(&buffer)?,
match deladd {
DelAdd::Deletion => sender.delete_facet_string(&buffer)?,
DelAdd::Addition => sender.write_facet_string(&buffer, &value)?,
}
} }
} }

View File

@ -153,10 +153,12 @@ pub fn write_from_bbqueue(
} }
(key, None) => match database.delete(wtxn, key) { (key, None) => match database.delete(wtxn, key) {
Ok(false) => { Ok(false) => {
unreachable!( tracing::error!(
"We tried to delete an unknown key from {database_name}: {:?}", database_name,
key.as_bstr() key_bytes = ?key,
) formatted_key = ?key.as_bstr(),
"Attempt to delete an unknown key"
);
} }
Ok(_) => (), Ok(_) => (),
Err(error) => { Err(error) => {