use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;

use actix_web::ResponseError;
use meili_snap::snapshot;
use meilisearch::search_queue::SearchQueue;

#[actix_rt::test]
async fn search_queue_register() {
    let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap());

    // First, use all the cores
    let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a permit straight away")
        .unwrap();
    let _permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a permit straight away")
        .unwrap();

    // If we free one spot we should be able to register one new search
    drop(permit1);

    let permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a permit straight away")
        .unwrap();

    // And again
    drop(permit3);

    let _permit4 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a permit straight away")
        .unwrap();
}

#[actix_rt::test]
async fn wait_till_cores_are_available() {
    let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));

    // First, use all the cores
    let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a permit straight away")
        .unwrap();

    let ret = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit()).await;
    assert!(ret.is_err(), "The capacity is full, we should not get a permit");

    let q = queue.clone();
    let task = tokio::task::spawn(async move { q.try_get_search_permit().await });

    // after dropping a permit the previous task should be able to finish
    drop(permit1);
    let _permit2 = tokio::time::timeout(Duration::from_secs(1), task)
        .await
        .expect("I should get a permit straight away")
        .unwrap();
}

#[actix_rt::test]
async fn refuse_search_requests_when_queue_is_full() {
    let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap()));

    // First, use the whole capacity of the
    let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a permit straight away")
        .unwrap();

    let q = queue.clone();
    let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await });

    // Here the queue is full. By registering two new search requests the permit 2 and 3 should be thrown out
    let q = queue.clone();
    let _permit3 = tokio::task::spawn(async move { q.try_get_search_permit().await });

    let permit2 = tokio::time::timeout(Duration::from_secs(1), permit2)
        .await
        .expect("I should get a result straight away")
        .unwrap(); // task should end successfully

    let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err());
    let http_response = err.error_response();
    let mut headers: Vec<_> = http_response
        .headers()
        .iter()
        .map(|(name, value)| (name.to_string(), value.to_str().unwrap().to_string()))
        .collect();
    headers.sort();
    snapshot!(format!("{headers:?}"), @r###"[("content-type", "application/json"), ("retry-after", "10")]"###);

    let err = serde_json::to_string_pretty(&err).unwrap();
    snapshot!(err, @r###"
    {
      "message": "Too many search requests running at the same time: 1. Retry after 10s.",
      "code": "too_many_search_requests",
      "type": "system",
      "link": "https://docs.meilisearch.com/errors#too_many_search_requests"
    }
    "###);
}

#[actix_rt::test]
async fn search_request_crashes_while_holding_permits() {
    let queue = Arc::new(SearchQueue::new(1, NonZeroUsize::new(1).unwrap()));

    let (send, recv) = tokio::sync::oneshot::channel();

    // This first request take a cpu
    let q = queue.clone();
    tokio::task::spawn(async move {
        let _permit = q.try_get_search_permit().await.unwrap();
        recv.await.unwrap();
        panic!("oops an unexpected crash happened")
    });

    // This second request waits in the queue till the first request finishes
    let q = queue.clone();
    let task = tokio::task::spawn(async move {
        let _permit = q.try_get_search_permit().await.unwrap();
    });

    // By sending something in the channel the request holding a CPU will panic and should lose its permit
    send.send(()).unwrap();

    // Then the second request should be able to process and finishes correctly without panic
    tokio::time::timeout(Duration::from_secs(1), task)
        .await
        .expect("I should get a permit straight away")
        .unwrap();

    // I should even be able to take second permit here
    let _permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a permit straight away")
        .unwrap();
}

#[actix_rt::test]
async fn works_with_capacity_of_zero() {
    let queue = Arc::new(SearchQueue::new(0, NonZeroUsize::new(1).unwrap()));

    // First, use the whole capacity of the
    let permit1 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a permit straight away")
        .unwrap();

    // then we should get an error if we try to register a second search request.
    let permit2 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a result straight away");

    let err = meilisearch_types::error::ResponseError::from(permit2.unwrap_err());
    let http_response = err.error_response();
    let mut headers: Vec<_> = http_response
        .headers()
        .iter()
        .map(|(name, value)| (name.to_string(), value.to_str().unwrap().to_string()))
        .collect();
    headers.sort();
    snapshot!(format!("{headers:?}"), @r###"[("content-type", "application/json"), ("retry-after", "10")]"###);

    let err = serde_json::to_string_pretty(&err).unwrap();
    snapshot!(err, @r###"
    {
      "message": "Too many search requests running at the same time: 0. Retry after 10s.",
      "code": "too_many_search_requests",
      "type": "system",
      "link": "https://docs.meilisearch.com/errors#too_many_search_requests"
    }
    "###);

    drop(permit1);
    // After dropping the first permit we should be able to get a new permit
    let _permit3 = tokio::time::timeout(Duration::from_secs(1), queue.try_get_search_permit())
        .await
        .expect("I should get a permit straight away")
        .unwrap();
}