use ipnet::IpNet;
use std::time::Duration;
use bytes::Bytes;
use serde_with::serde_as;
use vector_lib::configurable::configurable_component;
use vector_lib::ipallowlist::IpAllowlistConfig;
use vector_lib::lookup::{owned_value_path, path};
use vector_lib::tcp::TcpKeepaliveConfig;
use vector_lib::tls::{CertificateMetadata, MaybeTlsSettings, TlsSourceConfig};
use vector_lib::EstimatedJsonEncodedSizeOf;
use vrl::path::OwnedValuePath;
use vrl::value::ObjectMap;
use crate::internal_events::{SocketEventsReceived, SocketMode};
use crate::sources::util::framestream::{FrameHandler, TcpFrameHandler};
use crate::{event::Event, sources::util::net::SocketListenAddr};
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::lookup::lookup_v2::OptionalValuePath;
#[serde_as]
#[configurable_component]
#[derive(Clone, Debug)]
pub struct TcpConfig {
#[configurable(derived)]
address: SocketListenAddr,
#[configurable(derived)]
keepalive: Option<TcpKeepaliveConfig>,
#[serde(default = "default_shutdown_timeout_secs")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
shutdown_timeout_secs: Duration,
#[serde(default = "default_port_key")]
pub port_key: OptionalValuePath,
#[configurable(derived)]
permit_origin: Option<IpAllowlistConfig>,
#[configurable(derived)]
tls: Option<TlsSourceConfig>,
#[configurable(metadata(docs::type_unit = "bytes"))]
receive_buffer_bytes: Option<usize>,
#[configurable(metadata(docs::type_unit = "seconds"))]
max_connection_duration_secs: Option<u64>,
#[configurable(metadata(docs::type_unit = "connections"))]
pub connection_limit: Option<u32>,
}
const fn default_shutdown_timeout_secs() -> Duration {
Duration::from_secs(30)
}
fn default_port_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("port"))
}
impl TcpConfig {
pub fn from_address(address: SocketListenAddr) -> Self {
Self {
address,
keepalive: None,
shutdown_timeout_secs: default_shutdown_timeout_secs(),
port_key: default_port_key(),
permit_origin: None,
tls: None,
receive_buffer_bytes: None,
max_connection_duration_secs: None,
connection_limit: None,
}
}
pub const fn port_key(&self) -> &OptionalValuePath {
&self.port_key
}
pub const fn tls(&self) -> &Option<TlsSourceConfig> {
&self.tls
}
pub const fn address(&self) -> SocketListenAddr {
self.address
}
pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
self.keepalive
}
pub const fn shutdown_timeout_secs(&self) -> Duration {
self.shutdown_timeout_secs
}
pub const fn receive_buffer_bytes(&self) -> Option<usize> {
self.receive_buffer_bytes
}
pub const fn max_connection_duration_secs(&self) -> Option<u64> {
self.max_connection_duration_secs
}
}
#[derive(Clone)]
pub struct DnstapFrameHandler<T: FrameHandler + Clone> {
frame_handler: T,
address: SocketListenAddr,
keepalive: Option<TcpKeepaliveConfig>,
shutdown_timeout_secs: Duration,
tls: MaybeTlsSettings,
tls_client_metadata_key: Option<OwnedValuePath>,
tls_client_metadata: Option<ObjectMap>,
receive_buffer_bytes: Option<usize>,
max_connection_duration_secs: Option<u64>,
max_connections: Option<u32>,
allowlist: Option<Vec<IpNet>>,
log_namespace: LogNamespace,
}
impl<T: FrameHandler + Clone> DnstapFrameHandler<T> {
pub fn new(
config: TcpConfig,
tls: MaybeTlsSettings,
frame_handler: T,
log_namespace: LogNamespace,
) -> Self {
let tls_client_metadata_key = config
.tls()
.as_ref()
.and_then(|tls| tls.client_metadata_key.clone())
.and_then(|k| k.path);
Self {
frame_handler,
address: config.address,
keepalive: config.keepalive,
shutdown_timeout_secs: config.shutdown_timeout_secs,
tls,
tls_client_metadata_key,
tls_client_metadata: None,
receive_buffer_bytes: config.receive_buffer_bytes,
max_connection_duration_secs: config.max_connection_duration_secs,
max_connections: config.connection_limit,
allowlist: config.permit_origin.map(Into::into),
log_namespace,
}
}
}
impl<T: FrameHandler + Clone> FrameHandler for DnstapFrameHandler<T> {
fn content_type(&self) -> String {
self.frame_handler.content_type()
}
fn max_frame_length(&self) -> usize {
self.frame_handler.max_frame_length()
}
fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
self.frame_handler
.handle_event(received_from, frame)
.map(|mut event| {
if let Event::Log(mut log_event) = event {
if let Some(tls_client_metadata) = &self.tls_client_metadata {
self.log_namespace.insert_source_metadata(
super::DnstapConfig::NAME,
&mut log_event,
self.tls_client_metadata_key
.as_ref()
.map(LegacyKey::Overwrite),
path!("tls_client_metadata"),
tls_client_metadata.clone(),
);
}
emit!(SocketEventsReceived {
mode: SocketMode::Tcp,
byte_size: log_event.estimated_json_encoded_size_of(),
count: 1
});
event = Event::from(log_event);
}
event
})
}
fn multithreaded(&self) -> bool {
self.frame_handler.multithreaded()
}
fn max_frame_handling_tasks(&self) -> u32 {
self.frame_handler.max_frame_handling_tasks()
}
fn host_key(&self) -> &Option<OwnedValuePath> {
self.frame_handler.host_key()
}
fn source_type_key(&self) -> Option<&OwnedValuePath> {
self.frame_handler.source_type_key()
}
fn timestamp_key(&self) -> Option<&OwnedValuePath> {
self.frame_handler.timestamp_key()
}
}
impl<T: FrameHandler + Clone> TcpFrameHandler for DnstapFrameHandler<T> {
fn address(&self) -> SocketListenAddr {
self.address
}
fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
self.keepalive
}
fn shutdown_timeout_secs(&self) -> Duration {
self.shutdown_timeout_secs
}
fn tls(&self) -> MaybeTlsSettings {
self.tls.clone()
}
fn tls_client_metadata_key(&self) -> Option<OwnedValuePath> {
self.tls_client_metadata_key.clone()
}
fn receive_buffer_bytes(&self) -> Option<usize> {
self.receive_buffer_bytes
}
fn max_connection_duration_secs(&self) -> Option<u64> {
self.max_connection_duration_secs
}
fn max_connections(&self) -> Option<u32> {
self.max_connections
}
fn insert_tls_client_metadata(&mut self, metadata: Option<CertificateMetadata>) {
self.tls_client_metadata = metadata.map(|c| {
let mut metadata = ObjectMap::new();
metadata.insert("subject".into(), c.subject().into());
metadata
});
}
fn allowed_origins(&self) -> Option<&[IpNet]> {
self.allowlist.as_deref()
}
}