use std::collections::HashMap;
use std::{future::ready, num::NonZeroUsize, panic, sync::Arc, sync::LazyLock};
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_sqs::operation::delete_message_batch::{
    DeleteMessageBatchError, DeleteMessageBatchOutput,
};
use aws_sdk_sqs::operation::receive_message::ReceiveMessageError;
use aws_sdk_sqs::types::{DeleteMessageBatchRequestEntry, Message};
use aws_sdk_sqs::Client as SqsClient;
use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
use aws_smithy_runtime_api::client::result::SdkError;
use aws_types::region::Region;
use bytes::Bytes;
use chrono::{DateTime, TimeZone, Utc};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
use smallvec::SmallVec;
use snafu::{ResultExt, Snafu};
use tokio::{pin, select};
use tokio_util::codec::FramedRead;
use tracing::Instrument;
use vector_lib::codecs::decoding::FramingError;
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{
    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
};
use crate::codecs::Decoder;
use crate::event::{Event, LogEvent};
use crate::{
    aws::AwsTimeout,
    config::{SourceAcknowledgementsConfig, SourceContext},
    event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf},
    internal_events::{
        EventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
        SqsMessageDeleteSucceeded, SqsMessageProcessingError, SqsMessageProcessingSucceeded,
        SqsMessageReceiveError, SqsMessageReceiveSucceeded, SqsS3EventRecordInvalidEventIgnored,
        StreamClosedError,
    },
    line_agg::{self, LineAgg},
    shutdown::ShutdownSignal,
    sources::aws_s3::AwsS3Config,
    tls::TlsConfig,
    SourceSender,
};
use vector_lib::config::{log_schema, LegacyKey, LogNamespace};
use vector_lib::event::MaybeAsLogMut;
use vector_lib::lookup::{metadata_path, path, PathPrefix};
static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
    LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
#[serde_as]
#[configurable_component]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields)]
pub(super) struct Config {
    #[configurable(metadata(
        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
    ))]
    #[configurable(validation(format = "uri"))]
    pub(super) queue_url: String,
    #[serde(default = "default_poll_secs")]
    #[derivative(Default(value = "default_poll_secs()"))]
    #[configurable(metadata(docs::type_unit = "seconds"))]
    pub(super) poll_secs: u32,
    #[serde(default = "default_visibility_timeout_secs")]
    #[derivative(Default(value = "default_visibility_timeout_secs()"))]
    #[configurable(metadata(docs::type_unit = "seconds"))]
    #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
    pub(super) visibility_timeout_secs: u32,
    #[serde(default = "default_true")]
    #[derivative(Default(value = "default_true()"))]
    pub(super) delete_message: bool,
    #[serde(default = "default_true")]
    #[derivative(Default(value = "default_true()"))]
    pub(super) delete_failed_message: bool,
    #[configurable(metadata(docs::type_unit = "tasks"))]
    #[configurable(metadata(docs::examples = 5))]
    pub(super) client_concurrency: Option<NonZeroUsize>,
    #[serde(default = "default_max_number_of_messages")]
    #[derivative(Default(value = "default_max_number_of_messages()"))]
    #[configurable(metadata(docs::human_name = "Max Messages"))]
    #[configurable(metadata(docs::examples = 1))]
    pub(super) max_number_of_messages: u32,
    #[configurable(derived)]
    #[serde(default)]
    #[derivative(Default)]
    pub(super) tls_options: Option<TlsConfig>,
    #[configurable(derived)]
    #[derivative(Default)]
    #[serde(default)]
    #[serde(flatten)]
    pub(super) timeout: Option<AwsTimeout>,
}
const fn default_poll_secs() -> u32 {
    15
}
const fn default_visibility_timeout_secs() -> u32 {
    300
}
const fn default_max_number_of_messages() -> u32 {
    10
}
const fn default_true() -> bool {
    true
}
#[derive(Debug, Snafu)]
pub(super) enum IngestorNewError {
    #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
    InvalidNumberOfMessages { messages: u32 },
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Snafu)]
pub enum ProcessingError {
    #[snafu(display(
        "Could not parse SQS message with id {} as S3 notification: {}",
        message_id,
        source
    ))]
    InvalidSqsMessage {
        source: serde_json::Error,
        message_id: String,
    },
    #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
    GetObject {
        source: SdkError<GetObjectError, HttpResponse>,
        bucket: String,
        key: String,
    },
    #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
    ReadObject {
        source: Box<dyn FramingError>,
        bucket: String,
        key: String,
    },
    #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
    PipelineSend {
        source: crate::source_sender::ClosedError,
        bucket: String,
        key: String,
    },
    #[snafu(display(
        "Object notification for s3://{}/{} is a bucket in another region: {}",
        bucket,
        key,
        region
    ))]
    WrongRegion {
        region: String,
        bucket: String,
        key: String,
    },
    #[snafu(display("Unsupported S3 event version: {}.", version,))]
    UnsupportedS3EventVersion { version: semver::Version },
    #[snafu(display("Sink reported an error sending events"))]
    ErrorAcknowledgement,
}
pub struct State {
    region: Region,
    s3_client: S3Client,
    sqs_client: SqsClient,
    multiline: Option<line_agg::Config>,
    compression: super::Compression,
    queue_url: String,
    poll_secs: i32,
    max_number_of_messages: i32,
    client_concurrency: usize,
    visibility_timeout_secs: i32,
    delete_message: bool,
    delete_failed_message: bool,
    decoder: Decoder,
}
pub(super) struct Ingestor {
    state: Arc<State>,
}
impl Ingestor {
    pub(super) async fn new(
        region: Region,
        sqs_client: SqsClient,
        s3_client: S3Client,
        config: Config,
        compression: super::Compression,
        multiline: Option<line_agg::Config>,
        decoder: Decoder,
    ) -> Result<Ingestor, IngestorNewError> {
        if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
            return Err(IngestorNewError::InvalidNumberOfMessages {
                messages: config.max_number_of_messages,
            });
        }
        let state = Arc::new(State {
            region,
            s3_client,
            sqs_client,
            compression,
            multiline,
            queue_url: config.queue_url,
            poll_secs: config.poll_secs as i32,
            max_number_of_messages: config.max_number_of_messages as i32,
            client_concurrency: config
                .client_concurrency
                .map(|n| n.get())
                .unwrap_or_else(crate::num_threads),
            visibility_timeout_secs: config.visibility_timeout_secs as i32,
            delete_message: config.delete_message,
            delete_failed_message: config.delete_failed_message,
            decoder,
        });
        Ok(Ingestor { state })
    }
    pub(super) async fn run(
        self,
        cx: SourceContext,
        acknowledgements: SourceAcknowledgementsConfig,
        log_namespace: LogNamespace,
    ) -> Result<(), ()> {
        let acknowledgements = cx.do_acknowledgements(acknowledgements);
        let mut handles = Vec::new();
        for _ in 0..self.state.client_concurrency {
            let process = IngestorProcess::new(
                Arc::clone(&self.state),
                cx.out.clone(),
                cx.shutdown.clone(),
                log_namespace,
                acknowledgements,
            );
            let fut = process.run();
            let handle = tokio::spawn(fut.in_current_span());
            handles.push(handle);
        }
        for handle in handles.drain(..) {
            if let Err(e) = handle.await {
                if e.is_panic() {
                    panic::resume_unwind(e.into_panic());
                }
            }
        }
        Ok(())
    }
}
pub struct IngestorProcess {
    state: Arc<State>,
    out: SourceSender,
    shutdown: ShutdownSignal,
    acknowledgements: bool,
    log_namespace: LogNamespace,
    bytes_received: Registered<BytesReceived>,
    events_received: Registered<EventsReceived>,
}
impl IngestorProcess {
    pub fn new(
        state: Arc<State>,
        out: SourceSender,
        shutdown: ShutdownSignal,
        log_namespace: LogNamespace,
        acknowledgements: bool,
    ) -> Self {
        Self {
            state,
            out,
            shutdown,
            acknowledgements,
            log_namespace,
            bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
            events_received: register!(EventsReceived),
        }
    }
    async fn run(mut self) {
        let shutdown = self.shutdown.clone().fuse();
        pin!(shutdown);
        loop {
            select! {
                _ = &mut shutdown => break,
                _ = self.run_once() => {},
            }
        }
    }
    async fn run_once(&mut self) {
        let messages = self.receive_messages().await;
        let messages = messages
            .inspect(|messages| {
                emit!(SqsMessageReceiveSucceeded {
                    count: messages.len(),
                });
            })
            .inspect_err(|err| {
                emit!(SqsMessageReceiveError { error: err });
            })
            .unwrap_or_default();
        let mut delete_entries = Vec::new();
        for message in messages {
            let receipt_handle = match message.receipt_handle {
                None => {
                    warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
                    continue;
                }
                Some(ref handle) => handle.to_owned(),
            };
            let message_id = message
                .message_id
                .clone()
                .unwrap_or_else(|| "<unknown>".to_owned());
            match self.handle_sqs_message(message).await {
                Ok(()) => {
                    emit!(SqsMessageProcessingSucceeded {
                        message_id: &message_id
                    });
                    if self.state.delete_message {
                        delete_entries.push(
                            DeleteMessageBatchRequestEntry::builder()
                                .id(message_id)
                                .receipt_handle(receipt_handle)
                                .build()
                                .expect("all required builder params specified"),
                        );
                    }
                }
                Err(err) => {
                    emit!(SqsMessageProcessingError {
                        message_id: &message_id,
                        error: &err,
                    });
                }
            }
        }
        if !delete_entries.is_empty() {
            let cloned_entries = delete_entries.clone();
            match self.delete_messages(delete_entries).await {
                Ok(result) => {
                    if !result.successful.is_empty() {
                        emit!(SqsMessageDeleteSucceeded {
                            message_ids: result.successful,
                        });
                    }
                    if !result.failed.is_empty() {
                        emit!(SqsMessageDeletePartialError {
                            entries: result.failed
                        });
                    }
                }
                Err(err) => {
                    emit!(SqsMessageDeleteBatchError {
                        entries: cloned_entries,
                        error: err,
                    });
                }
            }
        }
    }
    async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
        let sqs_body = message.body.unwrap_or_default();
        let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
            .map(|notification| notification.message)
            .unwrap_or(sqs_body);
        let s3_event: SqsEvent =
            serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
                message_id: message
                    .message_id
                    .clone()
                    .unwrap_or_else(|| "<empty>".to_owned()),
            })?;
        match s3_event {
            SqsEvent::TestEvent(_s3_test_event) => {
                debug!(?message.message_id, message = "Found S3 Test Event.");
                Ok(())
            }
            SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
        }
    }
    async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
        for record in s3_event.records {
            self.handle_s3_event_record(record, self.log_namespace)
                .await?
        }
        Ok(())
    }
    async fn handle_s3_event_record(
        &mut self,
        s3_event: S3EventRecord,
        log_namespace: LogNamespace,
    ) -> Result<(), ProcessingError> {
        let event_version: semver::Version = s3_event.event_version.clone().into();
        if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
            return Err(ProcessingError::UnsupportedS3EventVersion {
                version: event_version.clone(),
            });
        }
        if s3_event.event_name.kind != "ObjectCreated" {
            emit!(SqsS3EventRecordInvalidEventIgnored {
                bucket: &s3_event.s3.bucket.name,
                key: &s3_event.s3.object.key,
                kind: &s3_event.event_name.kind,
                name: &s3_event.event_name.name,
            });
            return Ok(());
        }
        if self.state.region.as_ref() != s3_event.aws_region.as_str() {
            return Err(ProcessingError::WrongRegion {
                bucket: s3_event.s3.bucket.name.clone(),
                key: s3_event.s3.object.key.clone(),
                region: s3_event.aws_region,
            });
        }
        let object_result = self
            .state
            .s3_client
            .get_object()
            .bucket(s3_event.s3.bucket.name.clone())
            .key(s3_event.s3.object.key.clone())
            .send()
            .await
            .context(GetObjectSnafu {
                bucket: s3_event.s3.bucket.name.clone(),
                key: s3_event.s3.object.key.clone(),
            });
        let object = object_result?;
        let metadata = object.metadata;
        let timestamp = object.last_modified.map(|ts| {
            Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
                .single()
                .expect("invalid timestamp")
        });
        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
        let object_reader = super::s3_object_decoder(
            self.state.compression,
            &s3_event.s3.object.key,
            object.content_encoding.as_deref(),
            object.content_type.as_deref(),
            object.body,
        )
        .await;
        let mut read_error = None;
        let bytes_received = self.bytes_received.clone();
        let events_received = self.events_received.clone();
        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
            FramedRead::new(object_reader, self.state.decoder.framer.clone())
                .map(|res| {
                    res.inspect(|bytes| {
                        bytes_received.emit(ByteSize(bytes.len()));
                    })
                    .map_err(|err| {
                        read_error = Some(err);
                    })
                    .ok()
                })
                .take_while(|res| ready(res.is_some()))
                .map(|r| r.expect("validated by take_while")),
        );
        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
            Some(config) => Box::new(
                LineAgg::new(
                    lines.map(|line| ((), line, ())),
                    line_agg::Logic::new(config.clone()),
                )
                .map(|(_src, line, _context, _lastline_context)| line),
            ),
            None => lines,
        };
        let mut stream = lines.flat_map(|line| {
            let events = match self.state.decoder.deserializer_parse(line) {
                Ok((events, _events_size)) => events,
                Err(_error) => {
                    SmallVec::new()
                }
            };
            let events = events
                .into_iter()
                .map(|mut event: Event| {
                    event = event.with_batch_notifier_option(&batch);
                    if let Some(log_event) = event.maybe_as_log_mut() {
                        handle_single_log(
                            log_event,
                            log_namespace,
                            &s3_event,
                            &metadata,
                            timestamp,
                        );
                    }
                    events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
                    event
                })
                .collect::<Vec<Event>>();
            futures::stream::iter(events)
        });
        let send_error = match self.out.send_event_stream(&mut stream).await {
            Ok(_) => None,
            Err(_) => {
                let (count, _) = stream.size_hint();
                emit!(StreamClosedError { count });
                Some(crate::source_sender::ClosedError)
            }
        };
        drop(stream);
        drop(batch);
        if let Some(error) = read_error {
            Err(ProcessingError::ReadObject {
                source: error,
                bucket: s3_event.s3.bucket.name.clone(),
                key: s3_event.s3.object.key.clone(),
            })
        } else if let Some(error) = send_error {
            Err(ProcessingError::PipelineSend {
                source: error,
                bucket: s3_event.s3.bucket.name.clone(),
                key: s3_event.s3.object.key.clone(),
            })
        } else {
            match receiver {
                None => Ok(()),
                Some(receiver) => {
                    let result = receiver.await;
                    match result {
                        BatchStatus::Delivered => Ok(()),
                        BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement),
                        BatchStatus::Rejected => {
                            if self.state.delete_failed_message {
                                Ok(())
                            } else {
                                Err(ProcessingError::ErrorAcknowledgement)
                            }
                        }
                    }
                }
            }
        }
    }
    async fn receive_messages(
        &mut self,
    ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
        self.state
            .sqs_client
            .receive_message()
            .queue_url(self.state.queue_url.clone())
            .max_number_of_messages(self.state.max_number_of_messages)
            .visibility_timeout(self.state.visibility_timeout_secs)
            .wait_time_seconds(self.state.poll_secs)
            .send()
            .map_ok(|res| res.messages.unwrap_or_default())
            .await
    }
    async fn delete_messages(
        &mut self,
        entries: Vec<DeleteMessageBatchRequestEntry>,
    ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
        self.state
            .sqs_client
            .delete_message_batch()
            .queue_url(self.state.queue_url.clone())
            .set_entries(Some(entries))
            .send()
            .await
    }
}
fn handle_single_log(
    log: &mut LogEvent,
    log_namespace: LogNamespace,
    s3_event: &S3EventRecord,
    metadata: &Option<HashMap<String, String>>,
    timestamp: Option<DateTime<Utc>>,
) {
    log_namespace.insert_source_metadata(
        AwsS3Config::NAME,
        log,
        Some(LegacyKey::Overwrite(path!("bucket"))),
        path!("bucket"),
        Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
    );
    log_namespace.insert_source_metadata(
        AwsS3Config::NAME,
        log,
        Some(LegacyKey::Overwrite(path!("object"))),
        path!("object"),
        Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
    );
    log_namespace.insert_source_metadata(
        AwsS3Config::NAME,
        log,
        Some(LegacyKey::Overwrite(path!("region"))),
        path!("region"),
        Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
    );
    if let Some(metadata) = metadata {
        for (key, value) in metadata {
            log_namespace.insert_source_metadata(
                AwsS3Config::NAME,
                log,
                Some(LegacyKey::Overwrite(path!(key))),
                path!("metadata", key.as_str()),
                value.clone(),
            );
        }
    }
    log_namespace.insert_vector_metadata(
        log,
        log_schema().source_type_key(),
        path!("source_type"),
        Bytes::from_static(AwsS3Config::NAME.as_bytes()),
    );
    match log_namespace {
        LogNamespace::Vector => {
            if let Some(timestamp) = timestamp {
                log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
            }
            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
        }
        LogNamespace::Legacy => {
            if let Some(timestamp_key) = log_schema().timestamp_key() {
                log.try_insert(
                    (PathPrefix::Event, timestamp_key),
                    timestamp.unwrap_or_else(Utc::now),
                );
            }
        }
    };
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct SnsNotification {
    pub message: String,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(untagged)]
enum SqsEvent {
    Event(S3Event),
    TestEvent(S3TestEvent),
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct S3TestEvent {
    pub service: String,
    pub event: S3EventName,
    pub bucket: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct S3Event {
    pub records: Vec<S3EventRecord>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct S3EventRecord {
    pub event_version: S3EventVersion,
    pub event_source: String,
    pub aws_region: String,
    pub event_name: S3EventName,
    pub s3: S3Message,
}
#[derive(Clone, Debug)]
pub struct S3EventVersion {
    pub major: u64,
    pub minor: u64,
}
impl From<S3EventVersion> for semver::Version {
    fn from(v: S3EventVersion) -> semver::Version {
        semver::Version::new(v.major, v.minor, 0)
    }
}
impl<'de> Deserialize<'de> for S3EventVersion {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: Deserializer<'de>,
    {
        use serde::de::Error;
        let s = String::deserialize(deserializer)?;
        let mut parts = s.splitn(2, '.');
        let major = parts
            .next()
            .ok_or_else(|| D::Error::custom("Missing major version number"))?
            .parse::<u64>()
            .map_err(D::Error::custom)?;
        let minor = parts
            .next()
            .ok_or_else(|| D::Error::custom("Missing minor version number"))?
            .parse::<u64>()
            .map_err(D::Error::custom)?;
        Ok(S3EventVersion { major, minor })
    }
}
impl Serialize for S3EventVersion {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
    }
}
#[derive(Clone, Debug)]
pub struct S3EventName {
    pub kind: String,
    pub name: String,
}
impl<'de> Deserialize<'de> for S3EventName {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: Deserializer<'de>,
    {
        use serde::de::Error;
        let s = String::deserialize(deserializer)?;
        let mut parts = s.splitn(2, ':');
        let kind = parts
            .next()
            .ok_or_else(|| D::Error::custom("Missing event kind"))?
            .parse::<String>()
            .map_err(D::Error::custom)?;
        let name = parts
            .next()
            .ok_or_else(|| D::Error::custom("Missing event name"))?
            .parse::<String>()
            .map_err(D::Error::custom)?;
        Ok(S3EventName { kind, name })
    }
}
impl Serialize for S3EventName {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
    }
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct S3Message {
    pub bucket: S3Bucket,
    pub object: S3Object,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct S3Bucket {
    pub name: String,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct S3Object {
    #[serde(with = "urlencoded_string")]
    pub key: String,
}
mod urlencoded_string {
    use percent_encoding::{percent_decode, utf8_percent_encode};
    pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
    where
        D: serde::de::Deserializer<'de>,
    {
        use serde::de::Error;
        serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
            let decoded = if s.iter().any(|c| *c == b'+') {
                let s = s
                    .iter()
                    .map(|c| if *c == b'+' { b' ' } else { *c })
                    .collect::<Vec<_>>();
                percent_decode(&s).decode_utf8().map(Into::into)
            } else {
                percent_decode(s).decode_utf8().map(Into::into)
            };
            decoded.map_err(|err| {
                D::Error::custom(format!("error url decoding S3 object key: {}", err))
            })
        })
    }
    pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::ser::Serializer,
    {
        serializer.serialize_str(
            &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
        )
    }
}
#[test]
fn test_key_deserialize() {
    let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
    assert_eq!(
        S3Object {
            key: "noog nork".to_string(),
        },
        value
    );
    let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
    assert_eq!(
        S3Object {
            key: "noog+nork".to_string(),
        },
        value
    );
}
#[test]
fn test_s3_testevent() {
    let value: S3TestEvent = serde_json::from_str(
        r#"{
        "Service":"Amazon S3",
        "Event":"s3:TestEvent",
        "Time":"2014-10-13T15:57:02.089Z",
        "Bucket":"bucketname",
        "RequestId":"5582815E1AEA5ADF",
        "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
     }"#,
    )
    .unwrap();
    assert_eq!(value.service, "Amazon S3".to_string());
    assert_eq!(value.bucket, "bucketname".to_string());
    assert_eq!(value.event.kind, "s3".to_string());
    assert_eq!(value.event.name, "TestEvent".to_string());
}
#[test]
fn test_s3_sns_testevent() {
    let sns_value: SnsNotification = serde_json::from_str(
        r#"{
        "Type" : "Notification",
        "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
        "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
        "Subject" : "Testing publish to subscribed queues",
        "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
        "Timestamp" : "2012-03-29T05:12:16.901Z",
        "SignatureVersion" : "1",
        "Signature" : "EXAMPLEnTrFPa3...",
        "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
        "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
     }"#,
    ).unwrap();
    let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
    assert_eq!(value.service, "Amazon S3".to_string());
    assert_eq!(value.bucket, "bucketname".to_string());
    assert_eq!(value.event.kind, "s3".to_string());
    assert_eq!(value.event.name, "TestEvent".to_string());
}