use std::{convert::TryInto, io::ErrorKind};
use async_compression::tokio::bufread;
use aws_smithy_types::byte_stream::ByteStream;
use futures::{stream, stream::StreamExt, TryStreamExt};
use snafu::Snafu;
use tokio_util::io::StreamReader;
use vector_lib::codecs::decoding::{
    DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions,
};
use vector_lib::codecs::NewlineDelimitedDecoderConfig;
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::owned_value_path;
use vrl::value::{kind::Collection, Kind};
use super::util::MultilineConfig;
use crate::codecs::DecodingConfig;
use crate::{
    aws::{auth::AwsAuthentication, create_client, create_client_and_region, RegionOrEndpoint},
    common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
    config::{
        ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
    },
    line_agg,
    serde::{bool_or_struct, default_decoding},
    tls::TlsConfig,
};
pub mod sqs;
#[configurable_component]
#[configurable(metadata(docs::advanced))]
#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
#[derivative(Default)]
pub enum Compression {
    #[derivative(Default)]
    Auto,
    None,
    Gzip,
    Zstd,
}
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative)]
#[serde(rename_all = "lowercase")]
#[derivative(Default)]
enum Strategy {
    #[derivative(Default)]
    Sqs,
}
#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(default, deny_unknown_fields)]
pub struct AwsS3Config {
    #[serde(flatten)]
    region: RegionOrEndpoint,
    compression: Compression,
    #[configurable(metadata(docs::hidden))]
    strategy: Strategy,
    sqs: Option<sqs::Config>,
    #[configurable(deprecated)]
    #[configurable(metadata(docs::hidden))]
    assume_role: Option<String>,
    #[configurable(derived)]
    #[serde(default)]
    auth: AwsAuthentication,
    #[configurable(derived)]
    multiline: Option<MultilineConfig>,
    #[configurable(derived)]
    #[serde(default, deserialize_with = "bool_or_struct")]
    acknowledgements: SourceAcknowledgementsConfig,
    #[configurable(derived)]
    tls_options: Option<TlsConfig>,
    #[configurable(metadata(docs::hidden))]
    #[serde(default)]
    log_namespace: Option<bool>,
    #[configurable(derived)]
    #[serde(default = "default_framing")]
    #[derivative(Default(value = "default_framing()"))]
    pub framing: FramingConfig,
    #[configurable(derived)]
    #[serde(default = "default_decoding")]
    #[derivative(Default(value = "default_decoding()"))]
    pub decoding: DeserializerConfig,
}
const fn default_framing() -> FramingConfig {
    FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig {
        newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
    })
}
impl_generate_config_from_default!(AwsS3Config);
#[async_trait::async_trait]
#[typetag::serde(name = "aws_s3")]
impl SourceConfig for AwsS3Config {
    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
        let log_namespace = cx.log_namespace(self.log_namespace);
        let multiline_config: Option<line_agg::Config> = self
            .multiline
            .as_ref()
            .map(|config| config.try_into())
            .transpose()?;
        match self.strategy {
            Strategy::Sqs => Ok(Box::pin(
                self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
                    .await?
                    .run(cx, self.acknowledgements, log_namespace),
            )),
        }
    }
    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
        let log_namespace = global_log_namespace.merge(self.log_namespace);
        let mut schema_definition = self
            .decoding
            .schema_definition(log_namespace)
            .with_source_metadata(
                Self::NAME,
                Some(LegacyKey::Overwrite(owned_value_path!("bucket"))),
                &owned_value_path!("bucket"),
                Kind::bytes(),
                None,
            )
            .with_source_metadata(
                Self::NAME,
                Some(LegacyKey::Overwrite(owned_value_path!("object"))),
                &owned_value_path!("object"),
                Kind::bytes(),
                None,
            )
            .with_source_metadata(
                Self::NAME,
                Some(LegacyKey::Overwrite(owned_value_path!("region"))),
                &owned_value_path!("region"),
                Kind::bytes(),
                None,
            )
            .with_source_metadata(
                Self::NAME,
                None,
                &owned_value_path!("timestamp"),
                Kind::timestamp(),
                Some("timestamp"),
            )
            .with_standard_vector_source_metadata()
            .with_source_metadata(
                Self::NAME,
                None,
                &owned_value_path!("metadata"),
                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
                None,
            );
        if log_namespace == LogNamespace::Legacy {
            schema_definition = schema_definition.unknown_fields(Kind::bytes());
        }
        vec![SourceOutput::new_maybe_logs(
            self.decoding.output_type(),
            schema_definition,
        )]
    }
    fn can_acknowledge(&self) -> bool {
        true
    }
}
impl AwsS3Config {
    async fn create_sqs_ingestor(
        &self,
        multiline: Option<line_agg::Config>,
        proxy: &ProxyConfig,
        log_namespace: LogNamespace,
    ) -> crate::Result<sqs::Ingestor> {
        let region = self.region.region();
        let endpoint = self.region.endpoint();
        let s3_client = create_client::<S3ClientBuilder>(
            &self.auth,
            region.clone(),
            endpoint.clone(),
            proxy,
            &self.tls_options,
            &None,
        )
        .await?;
        let decoder =
            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
                .build()?;
        match self.sqs {
            Some(ref sqs) => {
                let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
                    &self.auth,
                    region.clone(),
                    endpoint,
                    proxy,
                    &sqs.tls_options,
                    &sqs.timeout,
                )
                .await?;
                let ingestor = sqs::Ingestor::new(
                    region,
                    sqs_client,
                    s3_client,
                    sqs.clone(),
                    self.compression,
                    multiline,
                    decoder,
                )
                .await?;
                Ok(ingestor)
            }
            None => Err(CreateSqsIngestorError::ConfigMissing {}.into()),
        }
    }
}
#[derive(Debug, Snafu)]
enum CreateSqsIngestorError {
    #[snafu(display("Configuration for `sqs` required when strategy=sqs"))]
    ConfigMissing,
}
async fn s3_object_decoder(
    compression: Compression,
    key: &str,
    content_encoding: Option<&str>,
    content_type: Option<&str>,
    mut body: ByteStream,
) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
    let first = if let Some(first) = body.next().await {
        first
    } else {
        return Box::new(tokio::io::empty());
    };
    let r = tokio::io::BufReader::new(StreamReader::new(
        stream::iter(Some(first))
            .chain(Box::pin(async_stream::stream! {
                while let Some(next) = body.next().await {
                    yield next;
                }
            }))
            .map_err(|e| std::io::Error::new(ErrorKind::Other, e)),
    ));
    let compression = match compression {
        Auto => determine_compression(content_encoding, content_type, key).unwrap_or(None),
        _ => compression,
    };
    use Compression::*;
    match compression {
        Auto => unreachable!(), None => Box::new(r),
        Gzip => Box::new({
            let mut decoder = bufread::GzipDecoder::new(r);
            decoder.multiple_members(true);
            decoder
        }),
        Zstd => Box::new({
            let mut decoder = bufread::ZstdDecoder::new(r);
            decoder.multiple_members(true);
            decoder
        }),
    }
}
fn determine_compression(
    content_encoding: Option<&str>,
    content_type: Option<&str>,
    key: &str,
) -> Option<Compression> {
    content_encoding
        .and_then(content_encoding_to_compression)
        .or_else(|| content_type.and_then(content_type_to_compression))
        .or_else(|| object_key_to_compression(key))
}
fn content_encoding_to_compression(content_encoding: &str) -> Option<Compression> {
    match content_encoding {
        "gzip" => Some(Compression::Gzip),
        "zstd" => Some(Compression::Zstd),
        _ => None,
    }
}
fn content_type_to_compression(content_type: &str) -> Option<Compression> {
    match content_type {
        "application/gzip" | "application/x-gzip" => Some(Compression::Gzip),
        "application/zstd" => Some(Compression::Zstd),
        _ => None,
    }
}
fn object_key_to_compression(key: &str) -> Option<Compression> {
    let extension = std::path::Path::new(key)
        .extension()
        .and_then(std::ffi::OsStr::to_str);
    use Compression::*;
    extension.and_then(|extension| match extension {
        "gz" => Some(Gzip),
        "zst" => Some(Zstd),
        _ => Option::None,
    })
}
#[cfg(test)]
mod test {
    use tokio::io::AsyncReadExt;
    use super::*;
    #[test]
    fn determine_compression() {
        use super::Compression;
        let cases = vec![
            ("out.log", Some("gzip"), None, Some(Compression::Gzip)),
            (
                "out.log",
                None,
                Some("application/gzip"),
                Some(Compression::Gzip),
            ),
            ("out.log.gz", None, None, Some(Compression::Gzip)),
            ("out.txt", None, None, None),
        ];
        for case in cases {
            let (key, content_encoding, content_type, expected) = case;
            assert_eq!(
                super::determine_compression(content_encoding, content_type, key),
                expected,
                "key={:?} content_encoding={:?} content_type={:?}",
                key,
                content_encoding,
                content_type,
            );
        }
    }
    #[tokio::test]
    async fn decode_empty_message_gzip() {
        let key = uuid::Uuid::new_v4().to_string();
        let mut data = Vec::new();
        s3_object_decoder(
            Compression::Auto,
            &key,
            Some("gzip"),
            None,
            ByteStream::default(),
        )
        .await
        .read_to_end(&mut data)
        .await
        .unwrap();
        assert!(data.is_empty());
    }
}
#[cfg(feature = "aws-s3-integration-tests")]
#[cfg(test)]
mod integration_tests {
    use std::{
        any::Any,
        collections::HashMap,
        fs::File,
        io::{self, BufRead},
        path::Path,
        time::Duration,
    };
    use aws_sdk_s3::Client as S3Client;
    use aws_sdk_sqs::{types::QueueAttributeName, Client as SqsClient};
    use similar_asserts::assert_eq;
    use vector_lib::codecs::{decoding::DeserializerConfig, JsonDeserializerConfig};
    use vector_lib::lookup::path;
    use vrl::value::Value;
    use super::*;
    use crate::{
        aws::{create_client, AwsAuthentication, RegionOrEndpoint},
        common::sqs::SqsClientBuilder,
        config::{ProxyConfig, SourceConfig, SourceContext},
        event::EventStatus::{self, *},
        line_agg,
        sources::{
            aws_s3::{sqs::S3Event, S3ClientBuilder},
            util::MultilineConfig,
        },
        test_util::{
            collect_n,
            components::{assert_source_compliance, SOURCE_TAGS},
            lines_from_gzip_file, random_lines, trace_init,
        },
        SourceSender,
    };
    fn lines_from_plaintext<P: AsRef<Path>>(path: P) -> Vec<String> {
        let file = io::BufReader::new(File::open(path).unwrap());
        file.lines().map(|x| x.unwrap()).collect()
    }
    #[tokio::test]
    async fn s3_process_message() {
        trace_init();
        let logs: Vec<String> = random_lines(100).take(10).collect();
        test_event(
            None,
            None,
            None,
            None,
            logs.join("\n").into_bytes(),
            logs,
            Delivered,
            false,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn s3_process_json_message() {
        trace_init();
        let logs: Vec<String> = random_lines(100).take(10).collect();
        let json_logs: Vec<String> = logs
            .iter()
            .map(|msg| {
                format!(r#"{{"message": "{}"}}"#, msg)
            })
            .collect();
        test_event(
            None,
            None,
            None,
            None,
            json_logs.join("\n").into_bytes(),
            logs,
            Delivered,
            false,
            DeserializerConfig::Json(JsonDeserializerConfig::default()),
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn s3_process_message_with_log_namespace() {
        trace_init();
        let logs: Vec<String> = random_lines(100).take(10).collect();
        test_event(
            None,
            None,
            None,
            None,
            logs.join("\n").into_bytes(),
            logs,
            Delivered,
            true,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn s3_process_message_spaces() {
        trace_init();
        let key = "key with spaces".to_string();
        let logs: Vec<String> = random_lines(100).take(10).collect();
        test_event(
            Some(key),
            None,
            None,
            None,
            logs.join("\n").into_bytes(),
            logs,
            Delivered,
            false,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn s3_process_message_special_characters() {
        trace_init();
        let key = format!("special:{}", uuid::Uuid::new_v4());
        let logs: Vec<String> = random_lines(100).take(10).collect();
        test_event(
            Some(key),
            None,
            None,
            None,
            logs.join("\n").into_bytes(),
            logs,
            Delivered,
            false,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn s3_process_message_gzip() {
        use std::io::Read;
        trace_init();
        let logs: Vec<String> = random_lines(100).take(10).collect();
        let mut gz = flate2::read::GzEncoder::new(
            io::Cursor::new(logs.join("\n").into_bytes()),
            flate2::Compression::fast(),
        );
        let mut buffer = Vec::new();
        gz.read_to_end(&mut buffer).unwrap();
        test_event(
            None,
            Some("gzip"),
            None,
            None,
            buffer,
            logs,
            Delivered,
            false,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn s3_process_message_multipart_gzip() {
        use std::io::Read;
        trace_init();
        let logs = lines_from_gzip_file("tests/data/multipart-gzip.log.gz");
        let buffer = {
            let mut file =
                File::open("tests/data/multipart-gzip.log.gz").expect("file can be opened");
            let mut data = Vec::new();
            file.read_to_end(&mut data).expect("file can be read");
            data
        };
        test_event(
            None,
            Some("gzip"),
            None,
            None,
            buffer,
            logs,
            Delivered,
            false,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn s3_process_message_multipart_zstd() {
        use std::io::Read;
        trace_init();
        let logs = lines_from_plaintext("tests/data/multipart-zst.log");
        let buffer = {
            let mut file =
                File::open("tests/data/multipart-zst.log.zst").expect("file can be opened");
            let mut data = Vec::new();
            file.read_to_end(&mut data).expect("file can be read");
            data
        };
        test_event(
            None,
            Some("zstd"),
            None,
            None,
            buffer,
            logs,
            Delivered,
            false,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn s3_process_message_multiline() {
        trace_init();
        let logs: Vec<String> = vec!["abc", "def", "geh"]
            .into_iter()
            .map(ToOwned::to_owned)
            .collect();
        test_event(
            None,
            None,
            None,
            Some(MultilineConfig {
                start_pattern: "abc".to_owned(),
                mode: line_agg::Mode::HaltWith,
                condition_pattern: "geh".to_owned(),
                timeout_ms: Duration::from_millis(1000),
            }),
            logs.join("\n").into_bytes(),
            vec!["abc\ndef\ngeh".to_owned()],
            Delivered,
            false,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[ignore]
    #[tokio::test]
    async fn handles_errored_status() {
        trace_init();
        let logs: Vec<String> = random_lines(100).take(10).collect();
        test_event(
            None,
            None,
            None,
            None,
            logs.join("\n").into_bytes(),
            logs,
            Errored,
            false,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn handles_failed_status() {
        trace_init();
        let logs: Vec<String> = random_lines(100).take(10).collect();
        test_event(
            None,
            None,
            None,
            None,
            logs.join("\n").into_bytes(),
            logs,
            Rejected,
            false,
            DeserializerConfig::Bytes,
            None,
        )
        .await;
    }
    #[tokio::test]
    async fn handles_failed_status_without_deletion() {
        trace_init();
        let logs: Vec<String> = random_lines(100).take(10).collect();
        let mut custom_options: HashMap<String, Box<dyn Any>> = HashMap::new();
        custom_options.insert("delete_failed_message".to_string(), Box::new(false));
        test_event(
            None,
            None,
            None,
            None,
            logs.join("\n").into_bytes(),
            logs,
            Rejected,
            false,
            DeserializerConfig::Bytes,
            Some(custom_options),
        )
        .await;
    }
    fn s3_address() -> String {
        std::env::var("S3_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
    }
    fn config(
        queue_url: &str,
        multiline: Option<MultilineConfig>,
        log_namespace: bool,
        decoding: DeserializerConfig,
    ) -> AwsS3Config {
        AwsS3Config {
            region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
            strategy: Strategy::Sqs,
            compression: Compression::Auto,
            multiline,
            sqs: Some(sqs::Config {
                queue_url: queue_url.to_string(),
                poll_secs: 1,
                max_number_of_messages: 10,
                visibility_timeout_secs: 0,
                client_concurrency: None,
                ..Default::default()
            }),
            acknowledgements: true.into(),
            log_namespace: Some(log_namespace),
            decoding,
            ..Default::default()
        }
    }
    #[allow(clippy::too_many_arguments)]
    async fn test_event(
        key: Option<String>,
        content_encoding: Option<&str>,
        content_type: Option<&str>,
        multiline: Option<MultilineConfig>,
        payload: Vec<u8>,
        expected_lines: Vec<String>,
        status: EventStatus,
        log_namespace: bool,
        decoding: DeserializerConfig,
        custom_options: Option<HashMap<String, Box<dyn Any>>>,
    ) {
        assert_source_compliance(&SOURCE_TAGS, async move {
            let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
            let s3 = s3_client().await;
            let sqs = sqs_client().await;
            let queue = create_queue(&sqs).await;
            let bucket = create_bucket(&s3).await;
            tokio::time::sleep(Duration::from_secs(1)).await;
            let mut config = config(&queue, multiline, log_namespace, decoding);
            if let Some(false) = custom_options
                .as_ref()
                .and_then(|opts| opts.get("delete_failed_message"))
                .and_then(|val| val.downcast_ref::<bool>())
                .copied()
            {
                config.sqs.as_mut().unwrap().delete_failed_message = false;
            }
            s3.put_object()
                .bucket(bucket.clone())
                .key(key.clone())
                .body(ByteStream::from(payload))
                .set_content_type(content_type.map(|t| t.to_owned()))
                .set_content_encoding(content_encoding.map(|t| t.to_owned()))
                .send()
                .await
                .expect("Could not put object");
            let sqs_client = sqs_client().await;
            let mut s3_event: S3Event = serde_json::from_str(
            r#"
{
   "Records":[
      {
         "eventVersion":"2.1",
         "eventSource":"aws:s3",
         "awsRegion":"us-east-1",
         "eventTime":"2022-03-24T19:43:00.548Z",
         "eventName":"ObjectCreated:Put",
         "userIdentity":{
            "principalId":"AWS:ARNOTAREALIDD4:user.name"
         },
         "requestParameters":{
            "sourceIPAddress":"136.56.73.213"
         },
         "responseElements":{
            "x-amz-request-id":"ZX6X98Q6NM9NQTP3",
            "x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"
         },
         "s3":{
            "s3SchemaVersion":"1.0",
            "configurationId":"asdfasdf",
            "bucket":{
               "name":"bucket-name",
               "ownerIdentity":{
                  "principalId":"A3PEG170DF9VNQ"
               },
               "arn":"arn:aws:s3:::nfox-testing-vector"
            },
            "object":{
               "key":"test-log.txt",
               "size":33,
               "eTag":"c981ce6672c4251048b0b834e334007f",
               "sequencer":"00623CC9C47AB5634C"
            }
         }
      }
   ]
}
        "#,
            )
            .unwrap();
            s3_event.records[0].s3.bucket.name.clone_from(&bucket);
            s3_event.records[0].s3.object.key.clone_from(&key);
            let _send_message_output = sqs_client
                .send_message()
                .queue_url(queue.clone())
                .message_body(serde_json::to_string(&s3_event).unwrap())
                .send()
                .await
                .unwrap();
            let (tx, rx) = SourceSender::new_test_finalize(status);
            let cx = SourceContext::new_test(tx, None);
            let namespace = cx.log_namespace(Some(log_namespace));
            let source = config.build(cx).await.unwrap();
            tokio::spawn(async move { source.await.unwrap() });
            let events = collect_n(rx, expected_lines.len()).await;
            assert_eq!(expected_lines.len(), events.len());
            for (i, event) in events.iter().enumerate() {
                if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
                    schema_definition.is_valid_for_event(event).unwrap();
                }
                let message = expected_lines[i].as_str();
                let log = event.as_log();
                if log_namespace {
                    assert_eq!(log.value(), &Value::from(message));
                } else {
                    assert_eq!(log["message"], message.into());
                }
                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("bucket"), path!("bucket")).unwrap(), &bucket.clone().into());
                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("object"), path!("object")).unwrap(), &key.clone().into());
                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("region"), path!("region")).unwrap(), &"us-east-1".into());
            }
            tokio::time::sleep(Duration::from_secs(10)).await;
            match status {
                Errored => {
                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
                }
                Rejected if !config.sqs.unwrap().delete_failed_message => {
                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
                }
                _ => {
                    assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
                }
            };
        }).await;
    }
    async fn create_queue(client: &SqsClient) -> String {
        let queue_name = uuid::Uuid::new_v4().to_string();
        let res = client
            .create_queue()
            .queue_name(queue_name.clone())
            .attributes(QueueAttributeName::VisibilityTimeout, "2")
            .send()
            .await
            .expect("Could not create queue");
        res.queue_url.expect("no queue url")
    }
    async fn count_messages(client: &SqsClient, queue: &str, wait_time_seconds: i32) -> usize {
        let sqs_result = client
            .receive_message()
            .queue_url(queue)
            .visibility_timeout(0)
            .wait_time_seconds(wait_time_seconds)
            .send()
            .await
            .unwrap();
        sqs_result
            .messages
            .map(|messages| messages.len())
            .unwrap_or(0)
    }
    async fn create_bucket(client: &S3Client) -> String {
        let bucket_name = uuid::Uuid::new_v4().to_string();
        client
            .create_bucket()
            .bucket(bucket_name.clone())
            .send()
            .await
            .expect("Could not create bucket");
        bucket_name
    }
    async fn s3_client() -> S3Client {
        let auth = AwsAuthentication::test_auth();
        let region_endpoint = RegionOrEndpoint {
            region: Some("us-east-1".to_owned()),
            endpoint: Some(s3_address()),
        };
        let proxy_config = ProxyConfig::default();
        create_client::<S3ClientBuilder>(
            &auth,
            region_endpoint.region(),
            region_endpoint.endpoint(),
            &proxy_config,
            &None,
            &None,
        )
        .await
        .unwrap()
    }
    async fn sqs_client() -> SqsClient {
        let auth = AwsAuthentication::test_auth();
        let region_endpoint = RegionOrEndpoint {
            region: Some("us-east-1".to_owned()),
            endpoint: Some(s3_address()),
        };
        let proxy_config = ProxyConfig::default();
        create_client::<SqsClientBuilder>(
            &auth,
            region_endpoint.region(),
            region_endpoint.endpoint(),
            &proxy_config,
            &None,
            &None,
        )
        .await
        .unwrap()
    }
}