mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 06:44:27 +01:00
Merge #4544
4544: Stream documents r=curquiza a=irevoire # Pull Request ## Related issue Fixes https://github.com/meilisearch/meilisearch/issues/4383 ### Perf 2M hackernews: main: Time to retrieve: 7s RAM consumption: 2+GiB stream: Time to retrieve: 4.7s RAM consumption: Too small Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
59ecf1cea7
38
Cargo.lock
generated
38
Cargo.lock
generated
@ -378,9 +378,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "arroy"
|
name = "arroy"
|
||||||
version = "0.2.0"
|
version = "0.3.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "efddeb1e7c32a551cc07ef4c3e181e3cd5478fdaf4f0bd799983171c1f6efe57"
|
checksum = "73897699bf04bac935c0b120990d2a511e91e563e0f9769f9c8bb983d98dfbc9"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
@ -1536,9 +1536,9 @@ checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "doxygen-rs"
|
name = "doxygen-rs"
|
||||||
version = "0.2.2"
|
version = "0.4.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bff670ea0c9bbb8414e7efa6e23ebde2b8f520a7eef78273a3918cf1903e7505"
|
checksum = "415b6ec780d34dcf624666747194393603d0373b7141eef01d12ee58881507d9"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"phf",
|
"phf",
|
||||||
]
|
]
|
||||||
@ -2262,12 +2262,11 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "heed"
|
name = "heed"
|
||||||
version = "0.20.0-alpha.9"
|
version = "0.20.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9648a50991c86df7d00c56c268c27754fcf4c80be2ba57fc4a00dc928c6fe934"
|
checksum = "6f7acb9683d7c7068aa46d47557bfa4e35a277964b350d9504a87b03610163fd"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags 2.5.0",
|
"bitflags 2.5.0",
|
||||||
"bytemuck",
|
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"heed-traits",
|
"heed-traits",
|
||||||
"heed-types",
|
"heed-types",
|
||||||
@ -2281,15 +2280,15 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "heed-traits"
|
name = "heed-traits"
|
||||||
version = "0.20.0-alpha.9"
|
version = "0.20.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5ab0b7d9cde969ad36dde692e487dc89d97f7168bf6a7bd3b894ad4bf7278298"
|
checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "heed-types"
|
name = "heed-types"
|
||||||
version = "0.20.0-alpha.9"
|
version = "0.20.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f0cb3567a7363f28b597bf6e9897b9466397951dd0e52df2c8196dd8a71af44a"
|
checksum = "3cb0d6ba3700c9a57e83c013693e3eddb68a6d9b6781cacafc62a0d992e8ddb3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bincode",
|
"bincode",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
@ -3189,14 +3188,13 @@ checksum = "f9d642685b028806386b2b6e75685faadd3eb65a85fff7df711ce18446a422da"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lmdb-master-sys"
|
name = "lmdb-master-sys"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "629c123f5321b48fa4f8f4d3b868165b748d9ba79c7103fb58e3a94f736bcedd"
|
checksum = "dc9048db3a58c0732d7236abc4909058f9d2708cfb6d7d047eb895fddec6419a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cc",
|
"cc",
|
||||||
"doxygen-rs",
|
"doxygen-rs",
|
||||||
"libc",
|
"libc",
|
||||||
"pkg-config",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -3348,6 +3346,7 @@ dependencies = [
|
|||||||
"rayon",
|
"rayon",
|
||||||
"regex",
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
"roaring",
|
||||||
"rustls 0.21.12",
|
"rustls 0.21.12",
|
||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
"segment",
|
"segment",
|
||||||
@ -4416,12 +4415,6 @@ dependencies = [
|
|||||||
"winreg",
|
"winreg",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "retain_mut"
|
|
||||||
version = "0.1.7"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ring"
|
name = "ring"
|
||||||
version = "0.17.8"
|
version = "0.17.8"
|
||||||
@ -4439,13 +4432,12 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "roaring"
|
name = "roaring"
|
||||||
version = "0.10.2"
|
version = "0.10.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873"
|
checksum = "a1c77081a55300e016cb86f2864415b7518741879db925b8d488a0ee0d2da6bf"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"retain_mut",
|
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -785,10 +785,12 @@ impl IndexScheduler {
|
|||||||
let dst = temp_snapshot_dir.path().join("auth");
|
let dst = temp_snapshot_dir.path().join("auth");
|
||||||
fs::create_dir_all(&dst)?;
|
fs::create_dir_all(&dst)?;
|
||||||
// TODO We can't use the open_auth_store_env function here but we should
|
// TODO We can't use the open_auth_store_env function here but we should
|
||||||
let auth = milli::heed::EnvOpenOptions::new()
|
let auth = unsafe {
|
||||||
.map_size(1024 * 1024 * 1024) // 1 GiB
|
milli::heed::EnvOpenOptions::new()
|
||||||
.max_dbs(2)
|
.map_size(1024 * 1024 * 1024) // 1 GiB
|
||||||
.open(&self.auth_path)?;
|
.max_dbs(2)
|
||||||
|
.open(&self.auth_path)
|
||||||
|
}?;
|
||||||
auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||||
|
|
||||||
// 5. Copy and tarball the flat snapshot
|
// 5. Copy and tarball the flat snapshot
|
||||||
|
@ -453,10 +453,12 @@ impl IndexScheduler {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let env = heed::EnvOpenOptions::new()
|
let env = unsafe {
|
||||||
.max_dbs(11)
|
heed::EnvOpenOptions::new()
|
||||||
.map_size(budget.task_db_size)
|
.max_dbs(11)
|
||||||
.open(options.tasks_path)?;
|
.map_size(budget.task_db_size)
|
||||||
|
.open(options.tasks_path)
|
||||||
|
}?;
|
||||||
|
|
||||||
let features = features::FeatureData::new(&env, options.instance_features)?;
|
let features = features::FeatureData::new(&env, options.instance_features)?;
|
||||||
|
|
||||||
@ -585,9 +587,9 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool {
|
fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool {
|
||||||
if let Ok(env) =
|
if let Ok(env) = unsafe {
|
||||||
heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path)
|
heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path)
|
||||||
{
|
} {
|
||||||
env.prepare_for_closing().wait();
|
env.prepare_for_closing().wait();
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
|
@ -49,7 +49,7 @@ pub fn open_auth_store_env(path: &Path) -> milli::heed::Result<milli::heed::Env>
|
|||||||
let mut options = EnvOpenOptions::new();
|
let mut options = EnvOpenOptions::new();
|
||||||
options.map_size(AUTH_STORE_SIZE); // 1GB
|
options.map_size(AUTH_STORE_SIZE); // 1GB
|
||||||
options.max_dbs(2);
|
options.max_dbs(2);
|
||||||
options.open(path)
|
unsafe { options.open(path) }
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HeedAuthStore {
|
impl HeedAuthStore {
|
||||||
|
@ -423,7 +423,6 @@ impl ErrorCode for HeedError {
|
|||||||
HeedError::Mdb(_)
|
HeedError::Mdb(_)
|
||||||
| HeedError::Encoding(_)
|
| HeedError::Encoding(_)
|
||||||
| HeedError::Decoding(_)
|
| HeedError::Decoding(_)
|
||||||
| HeedError::InvalidDatabaseTyping
|
|
||||||
| HeedError::DatabaseClosing
|
| HeedError::DatabaseClosing
|
||||||
| HeedError::BadOpenOptions { .. } => Code::Internal,
|
| HeedError::BadOpenOptions { .. } => Code::Internal,
|
||||||
}
|
}
|
||||||
|
@ -108,6 +108,7 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] }
|
|||||||
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
|
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
|
||||||
tracing-actix-web = "0.7.9"
|
tracing-actix-web = "0.7.9"
|
||||||
build-info = { version = "1.7.0", path = "../build-info" }
|
build-info = { version = "1.7.0", path = "../build-info" }
|
||||||
|
roaring = "0.10.3"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.9.0"
|
actix-rt = "2.9.0"
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
use std::io::ErrorKind;
|
use std::io::{ErrorKind, Write};
|
||||||
|
|
||||||
use actix_web::http::header::CONTENT_TYPE;
|
use actix_web::http::header::CONTENT_TYPE;
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
|
||||||
use bstr::ByteSlice as _;
|
use bstr::ByteSlice as _;
|
||||||
|
use bytes::Bytes;
|
||||||
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
||||||
use deserr::Deserr;
|
use deserr::Deserr;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use futures_util::Stream;
|
||||||
use index_scheduler::{IndexScheduler, TaskId};
|
use index_scheduler::{IndexScheduler, TaskId};
|
||||||
use meilisearch_types::deserr::query_params::Param;
|
use meilisearch_types::deserr::query_params::Param;
|
||||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
||||||
@ -22,7 +24,9 @@ use meilisearch_types::tasks::KindWithContent;
|
|||||||
use meilisearch_types::{milli, Document, Index};
|
use meilisearch_types::{milli, Document, Index};
|
||||||
use mime::Mime;
|
use mime::Mime;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use serde::Deserialize;
|
use roaring::RoaringBitmap;
|
||||||
|
use serde::ser::SerializeSeq;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use tempfile::tempfile;
|
use tempfile::tempfile;
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
@ -230,6 +234,34 @@ pub async fn get_documents(
|
|||||||
documents_by_query(&index_scheduler, index_uid, query)
|
documents_by_query(&index_scheduler, index_uid, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct Writer2Streamer {
|
||||||
|
sender: tokio::sync::mpsc::Sender<Result<Bytes, anyhow::Error>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Write for Writer2Streamer {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
|
self.sender.blocking_send(Ok(buf.to_vec().into())).map_err(std::io::Error::other)?;
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> std::io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn stream(
|
||||||
|
data: impl Serialize + Send + 'static,
|
||||||
|
) -> impl Stream<Item = Result<Bytes, anyhow::Error>> {
|
||||||
|
let (sender, receiver) = tokio::sync::mpsc::channel::<Result<Bytes, anyhow::Error>>(1);
|
||||||
|
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
serde_json::to_writer(std::io::BufWriter::new(Writer2Streamer { sender }), &data)
|
||||||
|
});
|
||||||
|
futures_util::stream::unfold(receiver, |mut receiver| async {
|
||||||
|
receiver.recv().await.map(|value| (value, receiver))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
fn documents_by_query(
|
fn documents_by_query(
|
||||||
index_scheduler: &IndexScheduler,
|
index_scheduler: &IndexScheduler,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
@ -239,12 +271,13 @@ fn documents_by_query(
|
|||||||
let BrowseQuery { offset, limit, fields, filter } = query;
|
let BrowseQuery { offset, limit, fields, filter } = query;
|
||||||
|
|
||||||
let index = index_scheduler.index(&index_uid)?;
|
let index = index_scheduler.index(&index_uid)?;
|
||||||
let (total, documents) = retrieve_documents(&index, offset, limit, filter, fields)?;
|
let documents = retrieve_documents(index, offset, limit, filter, fields)?;
|
||||||
|
|
||||||
let ret = PaginationView::new(offset, limit, total as usize, documents);
|
let ret = PaginationView::new(offset, limit, documents.total_documents as usize, documents);
|
||||||
|
|
||||||
debug!(returns = ?ret, "Get documents");
|
debug!(returns = ?ret, "Get documents");
|
||||||
Ok(HttpResponse::Ok().json(ret))
|
|
||||||
|
Ok(HttpResponse::Ok().streaming(stream(ret)))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug, Deserr)]
|
#[derive(Deserialize, Debug, Deserr)]
|
||||||
@ -590,14 +623,47 @@ fn some_documents<'a, 't: 'a>(
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn retrieve_documents<S: AsRef<str>>(
|
pub struct DocumentsStreamer {
|
||||||
index: &Index,
|
attributes_to_retrieve: Option<Vec<String>>,
|
||||||
|
documents: RoaringBitmap,
|
||||||
|
rtxn: RoTxn<'static>,
|
||||||
|
index: Index,
|
||||||
|
pub total_documents: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Serialize for DocumentsStreamer {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: serde::Serializer,
|
||||||
|
{
|
||||||
|
let mut seq = serializer.serialize_seq(Some(self.documents.len() as usize)).unwrap();
|
||||||
|
|
||||||
|
let documents = some_documents(&self.index, &self.rtxn, self.documents.iter()).unwrap();
|
||||||
|
for document in documents {
|
||||||
|
let document = document.unwrap();
|
||||||
|
let document = match self.attributes_to_retrieve {
|
||||||
|
Some(ref attributes_to_retrieve) => permissive_json_pointer::select_values(
|
||||||
|
&document,
|
||||||
|
attributes_to_retrieve.iter().map(|s| s.as_ref()),
|
||||||
|
),
|
||||||
|
None => document,
|
||||||
|
};
|
||||||
|
|
||||||
|
seq.serialize_element(&document)?;
|
||||||
|
}
|
||||||
|
seq.end()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn retrieve_documents(
|
||||||
|
index: Index,
|
||||||
offset: usize,
|
offset: usize,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
filter: Option<Value>,
|
filter: Option<Value>,
|
||||||
attributes_to_retrieve: Option<Vec<S>>,
|
attributes_to_retrieve: Option<Vec<String>>,
|
||||||
) -> Result<(u64, Vec<Document>), ResponseError> {
|
) -> Result<DocumentsStreamer, ResponseError> {
|
||||||
let rtxn = index.read_txn()?;
|
let rtxn = index.static_read_txn()?;
|
||||||
|
|
||||||
let filter = &filter;
|
let filter = &filter;
|
||||||
let filter = if let Some(filter) = filter {
|
let filter = if let Some(filter) = filter {
|
||||||
parse_filter(filter)
|
parse_filter(filter)
|
||||||
@ -607,7 +673,7 @@ fn retrieve_documents<S: AsRef<str>>(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let candidates = if let Some(filter) = filter {
|
let candidates = if let Some(filter) = filter {
|
||||||
filter.evaluate(&rtxn, index).map_err(|err| match err {
|
filter.evaluate(&rtxn, &index).map_err(|err| match err {
|
||||||
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
||||||
ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter)
|
ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter)
|
||||||
}
|
}
|
||||||
@ -617,27 +683,13 @@ fn retrieve_documents<S: AsRef<str>>(
|
|||||||
index.documents_ids(&rtxn)?
|
index.documents_ids(&rtxn)?
|
||||||
};
|
};
|
||||||
|
|
||||||
let (it, number_of_documents) = {
|
Ok(DocumentsStreamer {
|
||||||
let number_of_documents = candidates.len();
|
total_documents: candidates.len(),
|
||||||
(
|
attributes_to_retrieve,
|
||||||
some_documents(index, &rtxn, candidates.into_iter().skip(offset).take(limit))?,
|
documents: candidates.into_iter().skip(offset).take(limit).collect(),
|
||||||
number_of_documents,
|
rtxn,
|
||||||
)
|
index,
|
||||||
};
|
})
|
||||||
|
|
||||||
let documents: Result<Vec<_>, ResponseError> = it
|
|
||||||
.map(|document| {
|
|
||||||
Ok(match &attributes_to_retrieve {
|
|
||||||
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
|
|
||||||
&document?,
|
|
||||||
attributes_to_retrieve.iter().map(|s| s.as_ref()),
|
|
||||||
),
|
|
||||||
None => document?,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Ok((number_of_documents, documents?))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn retrieve_document<S: AsRef<str>>(
|
fn retrieve_document<S: AsRef<str>>(
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
@ -124,20 +125,31 @@ pub struct Pagination {
|
|||||||
pub limit: usize,
|
pub limit: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Clone, Serialize)]
|
||||||
pub struct PaginationView<T> {
|
pub struct PaginationView<T: Serialize> {
|
||||||
pub results: Vec<T>,
|
pub results: T,
|
||||||
pub offset: usize,
|
pub offset: usize,
|
||||||
pub limit: usize,
|
pub limit: usize,
|
||||||
pub total: usize,
|
pub total: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T: Serialize> fmt::Debug for PaginationView<T> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("PaginationView")
|
||||||
|
.field("offset", &self.offset)
|
||||||
|
.field("limit", &self.limit)
|
||||||
|
.field("total", &self.total)
|
||||||
|
.field("results", &"[...]")
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Pagination {
|
impl Pagination {
|
||||||
/// Given the full data to paginate, returns the selected section.
|
/// Given the full data to paginate, returns the selected section.
|
||||||
pub fn auto_paginate_sized<T>(
|
pub fn auto_paginate_sized<T>(
|
||||||
self,
|
self,
|
||||||
content: impl IntoIterator<Item = T> + ExactSizeIterator,
|
content: impl IntoIterator<Item = T> + ExactSizeIterator,
|
||||||
) -> PaginationView<T>
|
) -> PaginationView<Vec<T>>
|
||||||
where
|
where
|
||||||
T: Serialize,
|
T: Serialize,
|
||||||
{
|
{
|
||||||
@ -151,7 +163,7 @@ impl Pagination {
|
|||||||
self,
|
self,
|
||||||
total: usize,
|
total: usize,
|
||||||
content: impl IntoIterator<Item = T>,
|
content: impl IntoIterator<Item = T>,
|
||||||
) -> PaginationView<T>
|
) -> PaginationView<Vec<T>>
|
||||||
where
|
where
|
||||||
T: Serialize,
|
T: Serialize,
|
||||||
{
|
{
|
||||||
@ -161,7 +173,7 @@ impl Pagination {
|
|||||||
|
|
||||||
/// Given the data already paginated + the total number of elements, it stores
|
/// Given the data already paginated + the total number of elements, it stores
|
||||||
/// everything in a [PaginationResult].
|
/// everything in a [PaginationResult].
|
||||||
pub fn format_with<T>(self, total: usize, results: Vec<T>) -> PaginationView<T>
|
pub fn format_with<T>(self, total: usize, results: Vec<T>) -> PaginationView<Vec<T>>
|
||||||
where
|
where
|
||||||
T: Serialize,
|
T: Serialize,
|
||||||
{
|
{
|
||||||
@ -169,8 +181,8 @@ impl Pagination {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> PaginationView<T> {
|
impl<T: Serialize> PaginationView<T> {
|
||||||
pub fn new(offset: usize, limit: usize, total: usize, results: Vec<T>) -> Self {
|
pub fn new(offset: usize, limit: usize, total: usize, results: T) -> Self {
|
||||||
Self { offset, limit, results, total }
|
Self { offset, limit, results, total }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,9 +80,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
/// Clears the task queue located at `db_path`.
|
/// Clears the task queue located at `db_path`.
|
||||||
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
|
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
|
||||||
let path = db_path.join("tasks");
|
let path = db_path.join("tasks");
|
||||||
let env = EnvOpenOptions::new()
|
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&path) }
|
||||||
.max_dbs(100)
|
|
||||||
.open(&path)
|
|
||||||
.with_context(|| format!("While trying to open {:?}", path.display()))?;
|
.with_context(|| format!("While trying to open {:?}", path.display()))?;
|
||||||
|
|
||||||
eprintln!("Deleting tasks from the database...");
|
eprintln!("Deleting tasks from the database...");
|
||||||
@ -193,9 +191,7 @@ fn export_a_dump(
|
|||||||
FileStore::new(db_path.join("update_files")).context("While opening the FileStore")?;
|
FileStore::new(db_path.join("update_files")).context("While opening the FileStore")?;
|
||||||
|
|
||||||
let index_scheduler_path = db_path.join("tasks");
|
let index_scheduler_path = db_path.join("tasks");
|
||||||
let env = EnvOpenOptions::new()
|
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
|
||||||
.max_dbs(100)
|
|
||||||
.open(&index_scheduler_path)
|
|
||||||
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
||||||
|
|
||||||
eprintln!("Dumping the keys...");
|
eprintln!("Dumping the keys...");
|
||||||
|
@ -30,7 +30,7 @@ grenad = { version = "0.4.6", default-features = false, features = [
|
|||||||
"rayon",
|
"rayon",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
] }
|
] }
|
||||||
heed = { version = "0.20.0-alpha.9", default-features = false, features = [
|
heed = { version = "0.20.1", default-features = false, features = [
|
||||||
"serde-json",
|
"serde-json",
|
||||||
"serde-bincode",
|
"serde-bincode",
|
||||||
"read-txn-no-tls",
|
"read-txn-no-tls",
|
||||||
@ -82,7 +82,7 @@ hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls",
|
|||||||
] }
|
] }
|
||||||
tiktoken-rs = "0.5.8"
|
tiktoken-rs = "0.5.8"
|
||||||
liquid = "0.26.4"
|
liquid = "0.26.4"
|
||||||
arroy = "0.2.0"
|
arroy = "0.3.1"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
tracing = "0.1.40"
|
tracing = "0.1.40"
|
||||||
ureq = { version = "2.9.7", features = ["json"] }
|
ureq = { version = "2.9.7", features = ["json"] }
|
||||||
|
3
milli/fuzz/.gitignore
vendored
Normal file
3
milli/fuzz/.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
target
|
||||||
|
corpus
|
||||||
|
artifacts
|
@ -48,8 +48,6 @@ pub enum InternalError {
|
|||||||
GrenadInvalidFormatVersion,
|
GrenadInvalidFormatVersion,
|
||||||
#[error("Invalid merge while processing {process}")]
|
#[error("Invalid merge while processing {process}")]
|
||||||
IndexingMergingKeys { process: &'static str },
|
IndexingMergingKeys { process: &'static str },
|
||||||
#[error("{}", HeedError::InvalidDatabaseTyping)]
|
|
||||||
InvalidDatabaseTyping,
|
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
RayonThreadPool(#[from] ThreadPoolBuildError),
|
RayonThreadPool(#[from] ThreadPoolBuildError),
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
@ -429,7 +427,6 @@ impl From<HeedError> for Error {
|
|||||||
// TODO use the encoding
|
// TODO use the encoding
|
||||||
HeedError::Encoding(_) => InternalError(Serialization(Encoding { db_name: None })),
|
HeedError::Encoding(_) => InternalError(Serialization(Encoding { db_name: None })),
|
||||||
HeedError::Decoding(_) => InternalError(Serialization(Decoding { db_name: None })),
|
HeedError::Decoding(_) => InternalError(Serialization(Decoding { db_name: None })),
|
||||||
HeedError::InvalidDatabaseTyping => InternalError(InvalidDatabaseTyping),
|
|
||||||
HeedError::DatabaseClosing => InternalError(DatabaseClosing),
|
HeedError::DatabaseClosing => InternalError(DatabaseClosing),
|
||||||
HeedError::BadOpenOptions { .. } => UserError(InvalidLmdbOpenOptions),
|
HeedError::BadOpenOptions { .. } => UserError(InvalidLmdbOpenOptions),
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,7 @@ impl Index {
|
|||||||
|
|
||||||
options.max_dbs(25);
|
options.max_dbs(25);
|
||||||
|
|
||||||
let env = options.open(path)?;
|
let env = unsafe { options.open(path) }?;
|
||||||
let mut wtxn = env.write_txn()?;
|
let mut wtxn = env.write_txn()?;
|
||||||
let main = env.database_options().name(MAIN).create(&mut wtxn)?;
|
let main = env.database_options().name(MAIN).create(&mut wtxn)?;
|
||||||
let word_docids = env.create_database(&mut wtxn, Some(WORD_DOCIDS))?;
|
let word_docids = env.create_database(&mut wtxn, Some(WORD_DOCIDS))?;
|
||||||
@ -294,6 +294,11 @@ impl Index {
|
|||||||
self.env.read_txn()
|
self.env.read_txn()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a static read transaction to be able to read the index without keeping a reference to it.
|
||||||
|
pub fn static_read_txn(&self) -> heed::Result<RoTxn<'static>> {
|
||||||
|
self.env.clone().static_read_txn()
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the canonicalized path where the heed `Env` of this `Index` lives.
|
/// Returns the canonicalized path where the heed `Env` of this `Index` lives.
|
||||||
pub fn path(&self) -> &Path {
|
pub fn path(&self) -> &Path {
|
||||||
self.env.path()
|
self.env.path()
|
||||||
|
@ -379,7 +379,7 @@ pub(crate) mod test_helpers {
|
|||||||
let mut options = heed::EnvOpenOptions::new();
|
let mut options = heed::EnvOpenOptions::new();
|
||||||
let options = options.map_size(4096 * 4 * 1000 * 100);
|
let options = options.map_size(4096 * 4 * 1000 * 100);
|
||||||
let tempdir = tempfile::TempDir::new().unwrap();
|
let tempdir = tempfile::TempDir::new().unwrap();
|
||||||
let env = options.open(tempdir.path()).unwrap();
|
let env = unsafe { options.open(tempdir.path()) }.unwrap();
|
||||||
let mut wtxn = env.write_txn().unwrap();
|
let mut wtxn = env.write_txn().unwrap();
|
||||||
let content = env.create_database(&mut wtxn, None).unwrap();
|
let content = env.create_database(&mut wtxn, None).unwrap();
|
||||||
wtxn.commit().unwrap();
|
wtxn.commit().unwrap();
|
||||||
|
@ -556,7 +556,7 @@ where
|
|||||||
let writer_index = (embedder_index as u16) << 8;
|
let writer_index = (embedder_index as u16) << 8;
|
||||||
for k in 0..=u8::MAX {
|
for k in 0..=u8::MAX {
|
||||||
let writer =
|
let writer =
|
||||||
arroy::Writer::new(vector_arroy, writer_index | (k as u16), dimension)?;
|
arroy::Writer::new(vector_arroy, writer_index | (k as u16), dimension);
|
||||||
if writer.is_empty(wtxn)? {
|
if writer.is_empty(wtxn)? {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -661,7 +661,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
)?;
|
)?;
|
||||||
let writer_index = (embedder_index as u16) << 8;
|
let writer_index = (embedder_index as u16) << 8;
|
||||||
// FIXME: allow customizing distance
|
// FIXME: allow customizing distance
|
||||||
let writers: std::result::Result<Vec<_>, _> = (0..=u8::MAX)
|
let writers: Vec<_> = (0..=u8::MAX)
|
||||||
.map(|k| {
|
.map(|k| {
|
||||||
arroy::Writer::new(
|
arroy::Writer::new(
|
||||||
index.vector_arroy,
|
index.vector_arroy,
|
||||||
@ -670,7 +670,6 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
let writers = writers?;
|
|
||||||
|
|
||||||
// remove vectors for docids we want them removed
|
// remove vectors for docids we want them removed
|
||||||
let merger = remove_vectors_builder.build();
|
let merger = remove_vectors_builder.build();
|
||||||
|
Loading…
Reference in New Issue
Block a user