use std::time::Duration;
use vector_lib::ipallowlist::IpAllowlistConfig;
use chrono::Utc;
use serde_with::serde_as;
use smallvec::SmallVec;
use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
use crate::{
    codecs::Decoder,
    event::Event,
    serde::default_decoding,
    sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource},
    tcp::TcpKeepaliveConfig,
    tls::TlsSourceConfig,
};
use super::{default_host_key, SocketConfig};
#[serde_as]
#[configurable_component]
#[derive(Clone, Debug)]
pub struct TcpConfig {
    #[configurable(derived)]
    address: SocketListenAddr,
    #[configurable(derived)]
    keepalive: Option<TcpKeepaliveConfig>,
    #[serde(default = "default_shutdown_timeout_secs")]
    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
    #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
    shutdown_timeout_secs: Duration,
    host_key: Option<OptionalValuePath>,
    #[serde(default = "default_port_key")]
    port_key: OptionalValuePath,
    #[configurable(derived)]
    pub permit_origin: Option<IpAllowlistConfig>,
    #[configurable(derived)]
    tls: Option<TlsSourceConfig>,
    #[configurable(metadata(docs::type_unit = "bytes"))]
    receive_buffer_bytes: Option<usize>,
    #[configurable(metadata(docs::type_unit = "seconds"))]
    max_connection_duration_secs: Option<u64>,
    #[configurable(metadata(docs::type_unit = "connections"))]
    pub connection_limit: Option<u32>,
    #[configurable(derived)]
    pub(super) framing: Option<FramingConfig>,
    #[configurable(derived)]
    #[serde(default = "default_decoding")]
    pub(super) decoding: DeserializerConfig,
    #[serde(default)]
    #[configurable(metadata(docs::hidden))]
    pub log_namespace: Option<bool>,
}
const fn default_shutdown_timeout_secs() -> Duration {
    Duration::from_secs(30)
}
fn default_port_key() -> OptionalValuePath {
    OptionalValuePath::from(owned_value_path!("port"))
}
impl TcpConfig {
    pub fn from_address(address: SocketListenAddr) -> Self {
        Self {
            address,
            keepalive: None,
            shutdown_timeout_secs: default_shutdown_timeout_secs(),
            host_key: None,
            port_key: default_port_key(),
            permit_origin: None,
            tls: None,
            receive_buffer_bytes: None,
            max_connection_duration_secs: None,
            framing: None,
            decoding: default_decoding(),
            connection_limit: None,
            log_namespace: None,
        }
    }
    pub(super) fn host_key(&self) -> OptionalValuePath {
        self.host_key.clone().unwrap_or(default_host_key())
    }
    pub const fn port_key(&self) -> &OptionalValuePath {
        &self.port_key
    }
    pub const fn tls(&self) -> &Option<TlsSourceConfig> {
        &self.tls
    }
    pub const fn framing(&self) -> &Option<FramingConfig> {
        &self.framing
    }
    pub const fn decoding(&self) -> &DeserializerConfig {
        &self.decoding
    }
    pub const fn address(&self) -> SocketListenAddr {
        self.address
    }
    pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
        self.keepalive
    }
    pub const fn shutdown_timeout_secs(&self) -> Duration {
        self.shutdown_timeout_secs
    }
    pub const fn receive_buffer_bytes(&self) -> Option<usize> {
        self.receive_buffer_bytes
    }
    pub const fn max_connection_duration_secs(&self) -> Option<u64> {
        self.max_connection_duration_secs
    }
    pub fn set_max_connection_duration_secs(&mut self, val: Option<u64>) -> &mut Self {
        self.max_connection_duration_secs = val;
        self
    }
    pub fn set_shutdown_timeout_secs(&mut self, val: u64) -> &mut Self {
        self.shutdown_timeout_secs = Duration::from_secs(val);
        self
    }
    pub fn set_tls(&mut self, val: Option<TlsSourceConfig>) -> &mut Self {
        self.tls = val;
        self
    }
    pub fn set_framing(&mut self, val: Option<FramingConfig>) -> &mut Self {
        self.framing = val;
        self
    }
    pub fn set_decoding(&mut self, val: DeserializerConfig) -> &mut Self {
        self.decoding = val;
        self
    }
    pub fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
        self.log_namespace = val;
        self
    }
}
#[derive(Clone)]
pub struct RawTcpSource {
    config: TcpConfig,
    decoder: Decoder,
    log_namespace: LogNamespace,
}
impl RawTcpSource {
    pub const fn new(config: TcpConfig, decoder: Decoder, log_namespace: LogNamespace) -> Self {
        Self {
            config,
            decoder,
            log_namespace,
        }
    }
}
impl TcpSource for RawTcpSource {
    type Error = vector_lib::codecs::decoding::Error;
    type Item = SmallVec<[Event; 1]>;
    type Decoder = Decoder;
    type Acker = TcpNullAcker;
    fn decoder(&self) -> Self::Decoder {
        self.decoder.clone()
    }
    fn handle_events(&self, events: &mut [Event], host: std::net::SocketAddr) {
        let now = Utc::now();
        for event in events {
            if let Event::Log(ref mut log) = event {
                self.log_namespace.insert_standard_vector_source_metadata(
                    log,
                    SocketConfig::NAME,
                    now,
                );
                let legacy_host_key = self
                    .config
                    .host_key
                    .clone()
                    .unwrap_or(default_host_key())
                    .path;
                self.log_namespace.insert_source_metadata(
                    SocketConfig::NAME,
                    log,
                    legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
                    path!("host"),
                    host.ip().to_string(),
                );
                let legacy_port_key = self.config.port_key.clone().path;
                self.log_namespace.insert_source_metadata(
                    SocketConfig::NAME,
                    log,
                    legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
                    path!("port"),
                    host.port(),
                );
            }
        }
    }
    fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
        TcpNullAcker
    }
}