diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 803101e04..03a761bbe 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -37,7 +37,6 @@ use meilisearch_types::milli::vector::parsed_vectors::{ use meilisearch_types::milli::{self, Filter}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; -use meilisearch_types::zstd::dict::DecoderDictionary; use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; use roaring::RoaringBitmap; use time::macros::format_description; @@ -909,8 +908,7 @@ impl IndexScheduler { let mut index_dumper = dump.create_index(uid, &metadata)?; let fields_ids_map = index.fields_ids_map(&rtxn)?; - let dictionary = - index.document_compression_dictionary(&rtxn)?.map(DecoderDictionary::copy); + let dictionary = index.document_decompression_dictionary(&rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let embedding_configs = index.embedding_configs(&rtxn)?; let mut buffer = Vec::new(); diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index d841b838c..9abc28a92 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -2465,12 +2465,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -2525,12 +2533,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -2904,12 +2920,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -2955,12 +2979,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3011,12 +3043,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3129,12 +3169,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3184,12 +3232,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3898,12 +3954,20 @@ mod tests { // Has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3969,12 +4033,20 @@ mod tests { // Has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -4037,12 +4109,20 @@ mod tests { // Has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -4098,12 +4178,20 @@ mod tests { // Has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -4159,6 +4247,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"id"); @@ -4168,7 +4258,13 @@ mod tests { let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -4220,6 +4316,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"id"); @@ -4229,7 +4327,13 @@ mod tests { let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -4303,6 +4407,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"id"); @@ -4312,7 +4418,13 @@ mod tests { let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -4389,6 +4501,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"paw"); @@ -4398,7 +4512,13 @@ mod tests { let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -4468,6 +4588,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"doggoid"); @@ -4477,7 +4599,13 @@ mod tests { let documents = index .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -5120,6 +5248,8 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); // Ensure the document have been inserted into the relevant bitamp let configs = index.embedding_configs(&rtxn).unwrap(); @@ -5139,8 +5269,12 @@ mod tests { assert_json_snapshot!(embeddings[&simple_hf_name][0] == lab_embed, @"true"); assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true"); - let doc = index.compressed_documents(&rtxn, std::iter::once(0)).unwrap()[0].1; + let (_id, compressed_doc) = + index.compressed_documents(&rtxn, std::iter::once(0)).unwrap().remove(0); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let doc = obkv_to_json( &[ fields_ids_map.id("doggo").unwrap(), @@ -5194,6 +5328,8 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); // Ensure the document have been inserted into the relevant bitamp let configs = index.embedding_configs(&rtxn).unwrap(); @@ -5216,8 +5352,12 @@ mod tests { // remained beagle assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true"); - let doc = index.compressed_documents(&rtxn, std::iter::once(0)).unwrap()[0].1; + let (_id, compressed_doc) = + index.compressed_documents(&rtxn, std::iter::once(0)).unwrap().remove(0); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let doc = obkv_to_json( &[ fields_ids_map.id("doggo").unwrap(), @@ -5309,12 +5449,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), name: "documents after initial push"); @@ -5348,12 +5496,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); // the all the vectors linked to the new specified embedder have been removed // Only the unknown embedders stays in the document DB @@ -5456,9 +5612,15 @@ mod tests { // the document with the id 3 should have its original embedding updated let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let docid = index.external_documents_ids.get(&rtxn, "3").unwrap().unwrap(); - let doc = index.documents(&rtxn, Some(docid)).unwrap()[0]; - let doc = obkv_to_json(&field_ids, &field_ids_map, doc.1).unwrap(); + let (_id, compressed_doc) = + index.compressed_documents(&rtxn, Some(docid)).unwrap().remove(0); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + let doc = obkv_to_json(&field_ids, &field_ids_map, doc).unwrap(); snapshot!(json_string!(doc), @r###" { "id": 3, @@ -5570,12 +5732,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"}]"###); let conf = index.embedding_configs(&rtxn).unwrap(); @@ -5610,12 +5780,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), @"[]"); let conf = index.embedding_configs(&rtxn).unwrap(); @@ -5726,12 +5904,20 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"},{"id":1,"doggo":"intel"}]"###); } @@ -5761,12 +5947,20 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir","_vectors":{"manual":{"embeddings":[[0.0,0.0,0.0]],"regenerate":false}}},{"id":1,"doggo":"intel","_vectors":{"manual":{"embeddings":[[1.0,1.0,1.0]],"regenerate":false}}}]"###); } @@ -5794,12 +5988,20 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); // FIXME: redaction diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index a1aff713b..8ed6b3d8b 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -20,7 +20,6 @@ use meilisearch_types::milli::vector::parsed_vectors::ExplicitVectors; use meilisearch_types::milli::DocumentId; use meilisearch_types::star_or::OptionStarOrList; use meilisearch_types::tasks::KindWithContent; -use meilisearch_types::zstd::dict::DecoderDictionary; use meilisearch_types::{milli, Document, Index}; use mime::Mime; use once_cell::sync::Lazy; @@ -604,7 +603,7 @@ fn some_documents<'a, 't: 'a>( retrieve_vectors: RetrieveVectors, ) -> Result> + 'a, ResponseError> { let fields_ids_map = index.fields_ids_map(rtxn)?; - let dictionary = index.document_compression_dictionary(rtxn)?.map(DecoderDictionary::copy); + let dictionary = index.document_decompression_dictionary(rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let embedding_configs = index.embedding_configs(rtxn)?; let mut buffer = Vec::new(); diff --git a/meilisearch/src/search.rs b/meilisearch/src/search.rs index e815b6a01..0a4f879a7 100644 --- a/meilisearch/src/search.rs +++ b/meilisearch/src/search.rs @@ -19,7 +19,6 @@ use meilisearch_types::milli::vector::parsed_vectors::ExplicitVectors; use meilisearch_types::milli::vector::Embedder; use meilisearch_types::milli::{FacetValueHit, OrderBy, SearchForFacetValues, TimeBudget}; use meilisearch_types::settings::DEFAULT_PAGINATION_MAX_TOTAL_HITS; -use meilisearch_types::zstd::dict::DecoderDictionary; use meilisearch_types::{milli, Document}; use milli::tokenizer::TokenizerBuilder; use milli::{ @@ -1124,18 +1123,16 @@ fn make_hits( formatter_builder.crop_marker(format.crop_marker); formatter_builder.highlight_prefix(format.highlight_pre_tag); formatter_builder.highlight_suffix(format.highlight_post_tag); - let compression_dictionary = - index.document_compression_dictionary(rtxn)?.map(DecoderDictionary::copy); + let decompression_dictionary = index.document_decompression_dictionary(rtxn)?; let mut buffer = Vec::new(); let mut documents = Vec::new(); let embedding_configs = index.embedding_configs(rtxn)?; let documents_iter = index.compressed_documents(rtxn, documents_ids)?; for ((id, compressed), score) in documents_iter.into_iter().zip(document_scores.into_iter()) { - let obkv = match compression_dictionary.as_ref() { - // TODO manage this unwrap correctly - Some(dict) => compressed.decompress_with(&mut buffer, dict).unwrap(), - None => compressed.as_non_compressed(), - }; + let obkv = compressed + .decompress_with_optional_dictionary(&mut buffer, decompression_dictionary.as_ref()) + // TODO use a better error? + .map_err(|e| MeilisearchHttpError::HeedError(e.into()))?; // First generate a document with all the displayed fields let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?; diff --git a/meilitool/src/main.rs b/meilitool/src/main.rs index 9325d901c..8e3ca3101 100644 --- a/meilitool/src/main.rs +++ b/meilitool/src/main.rs @@ -260,6 +260,7 @@ fn export_a_dump( // 4. Dump the indexes let mut count = 0; + let mut buffer = Vec::new(); for result in index_mapping.iter(&rtxn)? { let (uid, uuid) = result?; let index_path = db_path.join("indexes").join(uuid.to_string()); @@ -268,6 +269,7 @@ fn export_a_dump( })?; let rtxn = index.read_txn()?; + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let metadata = IndexMetadata { uid: uid.to_owned(), primary_key: index.primary_key(&rtxn)?.map(String::from), @@ -281,7 +283,10 @@ fn export_a_dump( // 4.1. Dump the documents for ret in index.all_compressed_documents(&rtxn)? { - let (_id, doc) = ret?; + let (_id, compressed_doc) = ret?; + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?; index_dumper.push_document(&document)?; } diff --git a/milli/examples/search.rs b/milli/examples/search.rs index f570525dd..01f3322d5 100644 --- a/milli/examples/search.rs +++ b/milli/examples/search.rs @@ -30,6 +30,7 @@ fn main() -> Result<(), Box> { let index = Index::new(options, dataset)?; let txn = index.read_txn()?; + let dictionary = index.document_decompression_dictionary(&txn).unwrap(); let mut query = String::new(); while stdin().read_line(&mut query)? > 0 { for _ in 0..2 { @@ -49,6 +50,7 @@ fn main() -> Result<(), Box> { let start = Instant::now(); let mut ctx = SearchContext::new(&index, &txn)?; + let mut buffer = Vec::new(); let universe = filtered_universe(ctx.index, ctx.txn, &None)?; let docs = execute_search( @@ -75,11 +77,14 @@ fn main() -> Result<(), Box> { let elapsed = start.elapsed(); println!("new: {}us, docids: {:?}", elapsed.as_micros(), docs.documents_ids); if print_documents { - let documents = index + let compressed_documents = index .compressed_documents(&txn, docs.documents_ids.iter().copied()) .unwrap() .into_iter() - .map(|(id, obkv)| { + .map(|(id, compressed_obkv)| { + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let mut object = serde_json::Map::default(); for (fid, fid_name) in index.fields_ids_map(&txn).unwrap().iter() { let value = obkv.get(fid).unwrap(); @@ -90,17 +95,20 @@ fn main() -> Result<(), Box> { }) .collect::>(); - for (id, document) in documents { + for (id, document) in compressed_documents { println!("{id}:"); println!("{document}"); } - let documents = index + let compressed_documents = index .compressed_documents(&txn, docs.documents_ids.iter().copied()) .unwrap() .into_iter() - .map(|(id, obkv)| { + .map(|(id, compressed_obkv)| { let mut object = serde_json::Map::default(); + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); for (fid, fid_name) in index.fields_ids_map(&txn).unwrap().iter() { let value = obkv.get(fid).unwrap(); let value: serde_json::Value = serde_json::from_slice(value).unwrap(); @@ -110,7 +118,7 @@ fn main() -> Result<(), Box> { }) .collect::>(); println!("{}us: {:?}", elapsed.as_micros(), docs.documents_ids); - for (id, document) in documents { + for (id, document) in compressed_documents { println!("{id}:"); println!("{document}"); } diff --git a/milli/src/heed_codec/compressed_obkv_codec.rs b/milli/src/heed_codec/compressed_obkv_codec.rs index 1603380d0..b6ddc5f3a 100644 --- a/milli/src/heed_codec/compressed_obkv_codec.rs +++ b/milli/src/heed_codec/compressed_obkv_codec.rs @@ -7,9 +7,6 @@ use obkv::KvReaderU16; use zstd::bulk::{Compressor, Decompressor}; use zstd::dict::{DecoderDictionary, EncoderDictionary}; -// TODO move that elsewhere -pub const COMPRESSION_LEVEL: i32 = 12; - pub struct CompressedObkvCodec; impl<'a> heed::BytesDecode<'a> for CompressedObkvCodec { @@ -63,10 +60,13 @@ impl<'a> CompressedKvReaderU16<'a> { /// Decompresses this KvReader if necessary. pub fn decompress_with_optional_dictionary<'b>( - &'b self, + &self, buffer: &'b mut Vec, dictionary: Option<&DecoderDictionary>, - ) -> io::Result> { + ) -> io::Result> + where + 'a: 'b, + { match dictionary { Some(dict) => self.decompress_with(buffer, dict), None => Ok(self.as_non_compressed()), diff --git a/milli/src/heed_codec/mod.rs b/milli/src/heed_codec/mod.rs index 0e0098f7b..a221a0340 100644 --- a/milli/src/heed_codec/mod.rs +++ b/milli/src/heed_codec/mod.rs @@ -21,7 +21,7 @@ use thiserror::Error; pub use self::beu16_str_codec::BEU16StrCodec; pub use self::beu32_str_codec::BEU32StrCodec; pub use self::compressed_obkv_codec::{ - CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec, COMPRESSION_LEVEL, + CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec, }; pub use self::field_id_word_count_codec::FieldIdWordCountCodec; pub use self::fst_set_codec::FstSetCodec; diff --git a/milli/src/index.rs b/milli/src/index.rs index d33528b47..d13de2480 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -11,7 +11,7 @@ use roaring::RoaringBitmap; use rstar::RTree; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use zstd::dict::DecoderDictionary; +use zstd::dict::{DecoderDictionary, EncoderDictionary}; use crate::documents::PrimaryKey; use crate::error::{InternalError, UserError}; @@ -362,14 +362,30 @@ impl Index { self.main.remap_key_type::().delete(wtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY) } - /// Returns the optional dictionnary to be used when reading the OBKV documents. - pub fn document_compression_dictionary<'t>( + /// Returns the optional raw bytes dictionary to be used when reading or writing the OBKV documents. + pub fn document_compression_raw_dictionary<'t>( &self, rtxn: &'t RoTxn, ) -> heed::Result> { self.main.remap_types::().get(rtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY) } + pub fn document_decompression_dictionary<'t>( + &self, + rtxn: &'t RoTxn, + ) -> heed::Result>> { + self.document_compression_raw_dictionary(rtxn).map(|opt| opt.map(DecoderDictionary::copy)) + } + + pub fn document_compression_dictionary( + &self, + rtxn: &RoTxn, + ) -> heed::Result>> { + const COMPRESSION_LEVEL: i32 = 19; + self.document_compression_raw_dictionary(rtxn) + .map(|opt| opt.map(|bytes| EncoderDictionary::copy(bytes, COMPRESSION_LEVEL))) + } + /* documents ids */ /// Writes the documents ids that corresponds to the user-ids-documents-ids FST. @@ -1329,7 +1345,8 @@ impl Index { process: "external_id_of", }) })?; - let dictionary = self.document_compression_dictionary(rtxn)?.map(DecoderDictionary::copy); + let dictionary = + self.document_compression_raw_dictionary(rtxn)?.map(DecoderDictionary::copy); let mut buffer = Vec::new(); Ok(self.iter_compressed_documents(rtxn, ids)?.map(move |entry| -> Result<_> { let (_docid, compressed_obkv) = entry?; @@ -2445,11 +2462,12 @@ pub(crate) mod tests { "###); let rtxn = index.read_txn().unwrap(); - let dictionary = index.document_compression_dictionary(&rtxn).unwrap(); - let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [0]).unwrap()[0]; + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [0]).unwrap().remove(0); let mut buffer = Vec::new(); - let obkv = - compressed_obkv.decompress_with_optional_dictionary(&mut buffer, dictionary).unwrap(); + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); insta::assert_debug_snapshot!(json, @r###" { @@ -2458,9 +2476,10 @@ pub(crate) mod tests { "###); // Furthermore, when we retrieve document 34, it is not the result of merging 35 with 34 - let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [2]).unwrap()[0]; - let obkv = - compressed_obkv.decompress_with_optional_dictionary(&mut buffer, dictionary).unwrap(); + let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [2]).unwrap().remove(0); + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); insta::assert_debug_snapshot!(json, @r###" { @@ -2469,6 +2488,7 @@ pub(crate) mod tests { } "###); + drop(dictionary); drop(rtxn); // Add new documents again @@ -2667,11 +2687,16 @@ pub(crate) mod tests { } = search.execute().unwrap(); let primary_key_id = index.fields_ids_map(&rtxn).unwrap().id("primary_key").unwrap(); documents_ids.sort_unstable(); - let docs = index.compressed_documents(&rtxn, documents_ids).unwrap(); + let compressed_docs = index.compressed_documents(&rtxn, documents_ids).unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let mut buffer = Vec::new(); let mut all_ids = HashSet::new(); - for (_docid, obkv) in docs { - let id = obkv.get(primary_key_id).unwrap(); - assert!(all_ids.insert(id)); + for (_docid, compressed) in compressed_docs { + let doc = compressed + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + let id = doc.get(primary_key_id).unwrap(); + assert!(all_ids.insert(id.to_vec())); } } diff --git a/milli/src/search/new/tests/mod.rs b/milli/src/search/new/tests/mod.rs index cc9741024..dd5660b61 100644 --- a/milli/src/search/new/tests/mod.rs +++ b/milli/src/search/new/tests/mod.rs @@ -25,10 +25,11 @@ fn collect_field_values( let mut values = vec![]; let fid = index.fields_ids_map(txn).unwrap().id(fid).unwrap(); let mut buffer = Vec::new(); - let dictionary = index.document_compression_dictionary(txn).unwrap(); + let dictionary = index.document_decompression_dictionary(txn).unwrap(); for (_id, compressed_doc) in index.compressed_documents(txn, docids.iter().copied()).unwrap() { - let doc = - compressed_doc.decompress_with_optional_dictionary(&mut buffer, dictionary).unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); if let Some(v) = doc.get(fid) { let v: serde_json::Value = serde_json::from_slice(v).unwrap(); let v = v.to_string(); diff --git a/milli/src/snapshot_tests.rs b/milli/src/snapshot_tests.rs index faf30fc09..b08d5d028 100644 --- a/milli/src/snapshot_tests.rs +++ b/milli/src/snapshot_tests.rs @@ -407,13 +407,13 @@ pub fn snap_documents(index: &Index) -> String { let rtxn = index.read_txn().unwrap(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let display = fields_ids_map.ids().collect::>(); - let dictionary = index.document_compression_dictionary(&rtxn).unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let mut buffer = Vec::new(); for result in index.all_compressed_documents(&rtxn).unwrap() { let (_id, compressed_document) = result.unwrap(); let document = compressed_document - .decompress_with_optional_dictionary(&mut buffer, dictionary) + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) .unwrap(); let doc = obkv_to_json(&display, &fields_ids_map, document).unwrap(); snap.push_str(&serde_json::to_string(&doc).unwrap()); diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 64bf72beb..5be748abf 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -21,7 +21,6 @@ use serde::{Deserialize, Serialize}; use slice_group_by::GroupBy; use tracing::debug; use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk}; -use zstd::dict::EncoderDictionary; use self::enrich::enrich_documents_batch; pub use self::enrich::{extract_finite_float_from_value, DocumentId}; @@ -35,7 +34,7 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; -use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec, COMPRESSION_LEVEL}; +use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec}; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ @@ -784,8 +783,8 @@ where // TODO make this 64_000 const let dictionary = zstd::dict::from_continuous(&sample_data, &sample_sizes, 64_000)?; self.index.put_document_compression_dictionary(self.wtxn, &dictionary)?; - // TODO use declare the level 3 as a const - let dictionary = EncoderDictionary::copy(&dictionary, COMPRESSION_LEVEL); + // safety: We just set the dictionary above, it must be there when we get it back. + let dictionary = self.index.document_compression_dictionary(self.wtxn)?.unwrap(); // TODO do not remap types here but rather expose the &[u8] for the KvReaderU16 let mut iter = self.index.documents.iter_mut(self.wtxn)?; @@ -901,6 +900,7 @@ mod tests { #[test] fn simple_document_merge() { let mut index = TempIndex::new(); + let mut buffer = Vec::new(); index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; // First we send 3 documents with duplicate ids and @@ -919,16 +919,21 @@ mod tests { assert_eq!(count, 1); // Check that we get only one document from the database. - let docs = index.compressed_documents(&rtxn, Some(0)).unwrap(); - assert_eq!(docs.len(), 1); - let (id, doc) = docs[0]; + let mut compressed_docs = index.compressed_documents(&rtxn, Some(0)).unwrap(); + assert_eq!(compressed_docs.len(), 1); + let (id, compressed_doc) = compressed_docs.remove(0); assert_eq!(id, 0); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); // Check that this document is equal to the last one sent. let mut doc_iter = doc.iter(); assert_eq!(doc_iter.next(), Some((0, &b"1"[..]))); assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..]))); assert_eq!(doc_iter.next(), None); + drop(dictionary); drop(rtxn); // Second we send 1 document with id 1, to force it to be merged with the previous one. @@ -940,10 +945,14 @@ mod tests { assert_eq!(count, 1); // Check that we get only one document from the database. - let docs = index.compressed_documents(&rtxn, Some(0)).unwrap(); - assert_eq!(docs.len(), 1); - let (id, doc) = docs[0]; + let mut compressed_docs = index.compressed_documents(&rtxn, Some(0)).unwrap(); + assert_eq!(compressed_docs.len(), 1); + let (id, compressed_doc) = compressed_docs.remove(0); assert_eq!(id, 0); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); // Check that this document is equal to the last one sent. let mut doc_iter = doc.iter(); @@ -951,6 +960,7 @@ mod tests { assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..]))); assert_eq!(doc_iter.next(), Some((2, &b"25"[..]))); assert_eq!(doc_iter.next(), None); + drop(dictionary); drop(rtxn); } @@ -975,6 +985,7 @@ mod tests { #[test] fn simple_auto_generated_documents_ids() { let mut index = TempIndex::new(); + let mut buffer = Vec::new(); index.index_documents_config.autogenerate_docids = true; // First we send 3 documents with ids from 1 to 3. index @@ -987,12 +998,26 @@ mod tests { // Check that there is 3 documents now. let rtxn = index.read_txn().unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let count = index.number_of_documents(&rtxn).unwrap(); assert_eq!(count, 3); - let docs = index.compressed_documents(&rtxn, vec![0, 1, 2]).unwrap(); - let (_id, obkv) = docs.iter().find(|(_id, kv)| kv.get(0) == Some(br#""kevin""#)).unwrap(); + let compressed_docs = index.compressed_documents(&rtxn, vec![0, 1, 2]).unwrap(); + let (_id, compressed_obkv) = compressed_docs + .iter() + .find(|(_id, compressed_doc)| { + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + doc.get(0) == Some(br#""kevin""#) + }) + .unwrap(); + + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let kevin_uuid: String = serde_json::from_slice(obkv.get(1).unwrap()).unwrap(); + drop(dictionary); drop(rtxn); // Second we send 1 document with the generated uuid, to erase the previous ones. @@ -1000,21 +1025,34 @@ mod tests { // Check that there is **always** 3 documents. let rtxn = index.read_txn().unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let count = index.number_of_documents(&rtxn).unwrap(); assert_eq!(count, 3); // the document 0 has been deleted and reinserted with the id 3 - let docs = index.compressed_documents(&rtxn, vec![1, 2, 0]).unwrap(); - let kevin_position = - docs.iter().position(|(_, d)| d.get(0).unwrap() == br#""updated kevin""#).unwrap(); + let mut compressed_docs = index.compressed_documents(&rtxn, vec![1, 2, 0]).unwrap(); + let kevin_position = compressed_docs + .iter() + .position(|(_, compressed_doc)| { + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + + doc.get(0).unwrap() == br#""updated kevin""# + }) + .unwrap(); assert_eq!(kevin_position, 2); - let (_, doc) = docs[kevin_position]; + let (_, compressed_doc) = compressed_docs.remove(kevin_position); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); // Check that this document is equal to the last // one sent and that an UUID has been generated. assert_eq!(doc.get(0), Some(&br#""updated kevin""#[..])); // This is an UUID, it must be 36 bytes long plus the 2 surrounding string quotes ("). assert_eq!(doc.get(1).unwrap().len(), 36 + 2); + drop(dictionary); drop(rtxn); } diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 506eb2949..2fdfb2f08 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -11,7 +11,6 @@ use obkv::{KvReader, KvReaderU16, KvWriter}; use roaring::RoaringBitmap; use serde_json::Value; use smartstring::SmartString; -use zstd::dict::DecoderDictionary; use super::helpers::{ create_sorter, create_writer, keep_first, obkvs_keep_last_addition_merge_deletions, @@ -169,8 +168,7 @@ impl<'a, 'i> Transform<'a, 'i> { let external_documents_ids = self.index.external_documents_ids(); let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; - let dictionary = - self.index.document_compression_dictionary(wtxn)?.map(DecoderDictionary::copy); + let dictionary = self.index.document_decompression_dictionary(wtxn)?; let primary_key = cursor.primary_key().to_string(); let primary_key_id = self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; @@ -350,9 +348,12 @@ impl<'a, 'i> Transform<'a, 'i> { documents_seen: documents_count, }); + drop(dictionary); + self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?; self.index.put_primary_key(wtxn, &primary_key)?; self.documents_count += documents_count; + // Now that we have a valid sorter that contains the user id and the obkv we // give it to the last transforming function which returns the TransformOutput. Ok(documents_count) @@ -1037,8 +1038,7 @@ impl<'a, 'i> Transform<'a, 'i> { if original_sorter.is_some() || flattened_sorter.is_some() { let modified_faceted_fields = settings_diff.modified_faceted_fields(); - let dictionary = - self.index.document_compression_dictionary(wtxn)?.map(DecoderDictionary::copy); + let dictionary = self.index.document_decompression_dictionary(wtxn)?; let mut original_obkv_buffer = Vec::new(); let mut flattened_obkv_buffer = Vec::new(); diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index cb0c372f1..afcc7fc9e 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -10,7 +10,6 @@ use heed::types::Bytes; use heed::{BytesDecode, RwTxn}; use obkv::{KvReader, KvWriter}; use roaring::RoaringBitmap; -use zstd::dict::EncoderDictionary; use super::helpers::{ self, keep_first, merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, @@ -20,7 +19,7 @@ use super::helpers::{ use super::MergeFn; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; -use crate::heed_codec::{CompressedKvWriterU16, COMPRESSION_LEVEL}; +use crate::heed_codec::CompressedKvWriterU16; use crate::index::db_name::DOCUMENTS; use crate::index::IndexEmbeddingConfig; use crate::proximity::MAX_DISTANCE; @@ -164,10 +163,7 @@ pub(crate) fn write_typed_chunk_into_index( .into_iter() .map(|IndexEmbeddingConfig { name, .. }| name) .collect(); - // TODO declare the compression ratio as a const - let dictionary = index - .document_compression_dictionary(wtxn)? - .map(|dict| EncoderDictionary::copy(dict, COMPRESSION_LEVEL)); + let dictionary = index.document_compression_dictionary(wtxn)?; let mut vectors_buffer = Vec::new(); while let Some((key, reader)) = iter.next()? { let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); diff --git a/milli/src/update/settings.rs b/milli/src/update/settings.rs index 5905d775e..cbe8040f9 100644 --- a/milli/src/update/settings.rs +++ b/milli/src/update/settings.rs @@ -1765,6 +1765,8 @@ mod tests { // Check that the searchable field is correctly set to "name" only. let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); // When we search for something that is not in // the searchable fields it must not return any document. let result = index.search(&rtxn).query("23").execute().unwrap(); @@ -1773,10 +1775,17 @@ mod tests { // When we search for something that is in the searchable fields // we must find the appropriate document. let result = index.search(&rtxn).query(r#""kevin""#).execute().unwrap(); - let documents = index.compressed_documents(&rtxn, result.documents_ids).unwrap(); + let mut compressed_documents = + index.compressed_documents(&rtxn, result.documents_ids).unwrap(); let fid_map = index.fields_ids_map(&rtxn).unwrap(); - assert_eq!(documents.len(), 1); - assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); + assert_eq!(compressed_documents.len(), 1); + let (_id, compressed_document) = compressed_documents.remove(0); + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + + assert_eq!(document.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); + drop(dictionary); drop(rtxn); // We change the searchable fields to be the "name" field only. @@ -1801,6 +1810,7 @@ mod tests { // Check that the searchable field have been reset and documents are found now. let rtxn = index.read_txn().unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let fid_map = index.fields_ids_map(&rtxn).unwrap(); let user_defined_searchable_fields = index.user_defined_searchable_fields(&rtxn).unwrap(); snapshot!(format!("{user_defined_searchable_fields:?}"), @"None"); @@ -1809,8 +1819,13 @@ mod tests { snapshot!(format!("{searchable_fields:?}"), @r###"["id", "name", "age"]"###); let result = index.search(&rtxn).query("23").execute().unwrap(); assert_eq!(result.documents_ids.len(), 1); - let documents = index.compressed_documents(&rtxn, result.documents_ids).unwrap(); - assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); + let mut compressed_documents = + index.compressed_documents(&rtxn, result.documents_ids).unwrap(); + let (_id, compressed_document) = compressed_documents.remove(0); + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + assert_eq!(document.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); } #[test] @@ -1945,15 +1960,20 @@ mod tests { // Check that the displayed fields are correctly set. let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let fields_ids = index.filterable_fields(&rtxn).unwrap(); assert_eq!(fields_ids, hashset! { S("age") }); // Only count the field_id 0 and level 0 facet values. // TODO we must support typed CSVs for numbers to be understood. let fidmap = index.fields_ids_map(&rtxn).unwrap(); - for document in index.all_compressed_documents(&rtxn).unwrap() { - let document = document.unwrap(); - let json = crate::obkv_to_json(&fidmap.ids().collect::>(), &fidmap, document.1) + for result in index.all_compressed_documents(&rtxn).unwrap() { + let (_id, compressed_document) = result.unwrap(); + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) .unwrap(); + let json = + crate::obkv_to_json(&fidmap.ids().collect::>(), &fidmap, document).unwrap(); println!("json: {:?}", json); } let count = index @@ -1964,6 +1984,7 @@ mod tests { .unwrap() .count(); assert_eq!(count, 3); + drop(dictionary); drop(rtxn); // Index a little more documents with new and current facets values. @@ -2053,6 +2074,7 @@ mod tests { #[test] fn set_asc_desc_field() { let mut index = TempIndex::new(); + let mut buffer = Vec::new(); index.index_documents_config.autogenerate_docids = true; // Set the filterable fields to be the age. @@ -2074,12 +2096,16 @@ mod tests { // Run an empty query just to ensure that the search results are ordered. let rtxn = index.read_txn().unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let SearchResult { documents_ids, .. } = index.search(&rtxn).execute().unwrap(); - let documents = index.compressed_documents(&rtxn, documents_ids).unwrap(); + let compressed_documents = index.compressed_documents(&rtxn, documents_ids).unwrap(); // Fetch the documents "age" field in the ordre in which the documents appear. let age_field_id = index.fields_ids_map(&rtxn).unwrap().id("age").unwrap(); - let iter = documents.into_iter().map(|(_, doc)| { + let iter = compressed_documents.into_iter().map(|(_, compressed_doc)| { + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let bytes = doc.get(age_field_id).unwrap(); let string = std::str::from_utf8(bytes).unwrap(); string.parse::().unwrap() @@ -2476,6 +2502,7 @@ mod tests { #[test] fn setting_impact_relevancy() { let mut index = TempIndex::new(); + let mut buffer = Vec::new(); index.index_documents_config.autogenerate_docids = true; // Set the genres setting @@ -2509,7 +2536,11 @@ mod tests { let SearchResult { documents_ids, .. } = index.search(&rtxn).query("S").execute().unwrap(); let first_id = documents_ids[0]; let documents = index.compressed_documents(&rtxn, documents_ids).unwrap(); - let (_, content) = documents.iter().find(|(id, _)| *id == first_id).unwrap(); + let (_, compressed_content) = documents.iter().find(|(id, _)| *id == first_id).unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let content = compressed_content + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let fid = index.fields_ids_map(&rtxn).unwrap().id("title").unwrap(); let line = std::str::from_utf8(content.get(fid).unwrap()).unwrap(); diff --git a/milli/tests/search/query_criteria.rs b/milli/tests/search/query_criteria.rs index 59c5e0523..57c66e422 100644 --- a/milli/tests/search/query_criteria.rs +++ b/milli/tests/search/query_criteria.rs @@ -317,8 +317,20 @@ fn criteria_ascdesc() { wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); - let documents = - index.all_compressed_documents(&rtxn).unwrap().map(|doc| doc.unwrap()).collect::>(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let mut buffers = vec![Vec::new(); index.number_of_documents(&rtxn).unwrap() as usize]; + let documents = index + .all_compressed_documents(&rtxn) + .unwrap() + .zip(buffers.iter_mut()) + .map(|(compressed, buffer)| { + let (id, compressed) = compressed.unwrap(); + let doc = compressed + .decompress_with_optional_dictionary(buffer, dictionary.as_ref()) + .unwrap(); + (id, doc) + }) + .collect::>(); for criterion in [Asc(S("name")), Desc(S("name")), Asc(S("age")), Desc(S("age"))] { eprintln!("Testing with criterion: {:?}", &criterion);