use std::{
    collections::HashSet,
    convert::TryInto,
    fmt::Debug,
    net::{IpAddr, Ipv4Addr, Ipv6Addr},
    sync::LazyLock,
};
use base64::prelude::{Engine as _, BASE64_STANDARD};
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use dnsmsg_parser::{dns_message_parser::DnsParserOptions, ede::EDE};
use hickory_proto::{
    rr::domain::Name,
    serialize::binary::{BinDecodable, BinDecoder},
};
use prost::Message;
use snafu::Snafu;
use vrl::{owned_value_path, path};
use crate::{
    event::{LogEvent, Value},
    internal_events::DnstapParseWarning,
    Error, Result,
};
#[allow(warnings, clippy::all, clippy::pedantic, clippy::nursery)]
mod dnstap_proto {
    include!(concat!(env!("OUT_DIR"), "/dnstap.rs"));
}
use crate::sources::dnstap::schema::DNSTAP_VALUE_PATHS;
use dnstap_proto::{
    message::Type as DnstapMessageType, Dnstap, Message as DnstapMessage, SocketFamily,
    SocketProtocol,
};
use vector_lib::config::log_schema;
use vector_lib::lookup::lookup_v2::ValuePath;
use vector_lib::lookup::PathPrefix;
use super::{
    dns_message::{
        DnsRecord, EdnsOptionEntry, OptPseudoSection, QueryHeader, QueryQuestion, UpdateHeader,
        ZoneInfo,
    },
    dns_message_parser::DnsMessageParser,
};
#[derive(Debug, Snafu)]
enum DnstapParserError {
    #[snafu(display("Unsupported DNSTap message type: {}", "dnstap_message_type_id"))]
    UnsupportedDnstapMessageTypeError { dnstap_message_type_id: i32 },
}
static DNSTAP_MESSAGE_REQUEST_TYPE_IDS: LazyLock<HashSet<i32>> = LazyLock::new(|| {
    vec![
        DnstapMessageType::AuthQuery as i32,
        DnstapMessageType::ResolverQuery as i32,
        DnstapMessageType::ClientQuery as i32,
        DnstapMessageType::ForwarderQuery as i32,
        DnstapMessageType::StubQuery as i32,
        DnstapMessageType::ToolQuery as i32,
        DnstapMessageType::UpdateQuery as i32,
    ]
    .into_iter()
    .collect()
});
static DNSTAP_MESSAGE_RESPONSE_TYPE_IDS: LazyLock<HashSet<i32>> = LazyLock::new(|| {
    vec![
        DnstapMessageType::AuthResponse as i32,
        DnstapMessageType::ResolverResponse as i32,
        DnstapMessageType::ClientResponse as i32,
        DnstapMessageType::ForwarderResponse as i32,
        DnstapMessageType::StubResponse as i32,
        DnstapMessageType::ToolResponse as i32,
        DnstapMessageType::UpdateResponse as i32,
    ]
    .into_iter()
    .collect()
});
#[derive(Default)]
pub struct DnstapParser;
impl DnstapParser {
    fn insert<'a, V>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        path: impl ValuePath<'a>,
        value: V,
    ) -> Option<Value>
    where
        V: Into<Value> + Debug,
    {
        event.insert((PathPrefix::Event, prefix.concat(path)), value)
    }
    pub fn parse(
        event: &mut LogEvent,
        frame: Bytes,
        parsing_options: DnsParserOptions,
    ) -> Result<()> {
        let proto_msg = Dnstap::decode(frame.clone())?;
        let root = owned_value_path!();
        if let Some(server_id) = proto_msg.identity {
            DnstapParser::insert(
                event,
                &root,
                &DNSTAP_VALUE_PATHS.server_identity,
                String::from_utf8(server_id.clone()).unwrap_or_default(),
            );
        }
        if let Some(version) = proto_msg.version {
            DnstapParser::insert(
                event,
                &root,
                &DNSTAP_VALUE_PATHS.server_version,
                String::from_utf8(version).unwrap_or_default(),
            );
        }
        if let Some(extra) = proto_msg.extra {
            DnstapParser::insert(
                event,
                &root,
                &DNSTAP_VALUE_PATHS.extra,
                String::from_utf8(extra).unwrap_or_default(),
            );
        }
        let dnstap_data_type_id: i32 = proto_msg.r#type;
        let mut need_raw_data = false;
        DnstapParser::insert(
            event,
            &root,
            &DNSTAP_VALUE_PATHS.data_type_id,
            dnstap_data_type_id,
        );
        if let Some(dnstap_data_type) = to_dnstap_data_type(dnstap_data_type_id) {
            DnstapParser::insert(
                event,
                &root,
                &DNSTAP_VALUE_PATHS.data_type,
                dnstap_data_type.clone(),
            );
            if dnstap_data_type == "Message" {
                if let Some(message) = proto_msg.message {
                    if let Err(err) =
                        DnstapParser::parse_dnstap_message(event, &root, message, parsing_options)
                    {
                        emit!(DnstapParseWarning { error: &err });
                        need_raw_data = true;
                        DnstapParser::insert(
                            event,
                            &root,
                            &DNSTAP_VALUE_PATHS.error,
                            err.to_string(),
                        );
                    }
                }
            }
        } else {
            emit!(DnstapParseWarning {
                error: format!("Unknown dnstap data type: {}", dnstap_data_type_id)
            });
            need_raw_data = true;
        }
        if need_raw_data {
            DnstapParser::insert(
                event,
                &root,
                &DNSTAP_VALUE_PATHS.raw_data,
                BASE64_STANDARD.encode(&frame),
            );
        }
        Ok(())
    }
    fn parse_dnstap_message<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        dnstap_message: DnstapMessage,
        parsing_options: DnsParserOptions,
    ) -> Result<()> {
        if let Some(socket_family) = dnstap_message.socket_family {
            DnstapParser::parse_dnstap_message_socket_family(
                event,
                prefix.clone(),
                socket_family,
                &dnstap_message,
            )?;
        }
        if let Some(query_zone) = dnstap_message.query_zone.as_ref() {
            let mut decoder: BinDecoder = BinDecoder::new(query_zone);
            match Name::read(&mut decoder) {
                Ok(raw_data) => {
                    DnstapParser::insert(
                        event,
                        prefix.clone(),
                        &DNSTAP_VALUE_PATHS.query_zone,
                        raw_data.to_utf8(),
                    );
                }
                Err(error) => return Err(Error::from(error.to_string())),
            }
        }
        let dnstap_message_type_id = dnstap_message.r#type;
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.message_type_id,
            dnstap_message_type_id,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.message_type,
            to_dnstap_message_type(dnstap_message_type_id),
        );
        if let Some(query_time_sec) = dnstap_message.query_time_sec {
            DnstapParser::parse_dnstap_message_time(
                event,
                prefix.clone(),
                query_time_sec,
                dnstap_message.query_time_nsec,
                dnstap_message_type_id,
                dnstap_message.query_message.as_ref(),
                &DNSTAP_MESSAGE_REQUEST_TYPE_IDS,
            );
        }
        if let Some(response_time_sec) = dnstap_message.response_time_sec {
            DnstapParser::parse_dnstap_message_time(
                event,
                prefix.clone(),
                response_time_sec,
                dnstap_message.response_time_nsec,
                dnstap_message_type_id,
                dnstap_message.response_message.as_ref(),
                &DNSTAP_MESSAGE_RESPONSE_TYPE_IDS,
            );
        }
        DnstapParser::parse_dnstap_message_type(
            event,
            prefix.clone(),
            dnstap_message_type_id,
            dnstap_message,
            parsing_options,
        )?;
        Ok(())
    }
    fn parse_dnstap_message_type<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        dnstap_message_type_id: i32,
        dnstap_message: DnstapMessage,
        parsing_options: DnsParserOptions,
    ) -> Result<()> {
        match dnstap_message_type_id {
            1..=12 => {
                if let Some(query_message) = dnstap_message.query_message {
                    let mut query_message_parser =
                        DnsMessageParser::with_options(query_message, parsing_options.clone());
                    if let Err(error) = DnstapParser::parse_dns_query_message(
                        event,
                        prefix.concat(&DNSTAP_VALUE_PATHS.request_message),
                        &mut query_message_parser,
                    ) {
                        DnstapParser::log_raw_dns_message(
                            event,
                            prefix.concat(&DNSTAP_VALUE_PATHS.request_message),
                            query_message_parser.raw_message(),
                        );
                        return Err(error);
                    };
                }
                if let Some(response_message) = dnstap_message.response_message {
                    let mut response_message_parser =
                        DnsMessageParser::with_options(response_message, parsing_options);
                    if let Err(error) = DnstapParser::parse_dns_query_message(
                        event,
                        prefix.concat(&DNSTAP_VALUE_PATHS.response_message),
                        &mut response_message_parser,
                    ) {
                        DnstapParser::log_raw_dns_message(
                            event,
                            prefix.concat(&DNSTAP_VALUE_PATHS.response_message),
                            response_message_parser.raw_message(),
                        );
                        return Err(error);
                    };
                }
            }
            13 | 14 => {
                if let Some(update_request_message) = dnstap_message.query_message {
                    let mut update_request_message_parser = DnsMessageParser::with_options(
                        update_request_message,
                        parsing_options.clone(),
                    );
                    if let Err(error) = DnstapParser::parse_dns_update_message(
                        event,
                        &DNSTAP_VALUE_PATHS.request_message,
                        &mut update_request_message_parser,
                    ) {
                        DnstapParser::log_raw_dns_message(
                            event,
                            &DNSTAP_VALUE_PATHS.request_message,
                            update_request_message_parser.raw_message(),
                        );
                        return Err(error);
                    };
                }
                if let Some(update_response_message) = dnstap_message.response_message {
                    let mut update_response_message_parser =
                        DnsMessageParser::with_options(update_response_message, parsing_options);
                    if let Err(error) = DnstapParser::parse_dns_update_message(
                        event,
                        &DNSTAP_VALUE_PATHS.response_message,
                        &mut update_response_message_parser,
                    ) {
                        DnstapParser::log_raw_dns_message(
                            event,
                            &DNSTAP_VALUE_PATHS.response_message,
                            update_response_message_parser.raw_message(),
                        );
                        return Err(error);
                    };
                }
            }
            _ => {
                return Err(Box::new(
                    DnstapParserError::UnsupportedDnstapMessageTypeError {
                        dnstap_message_type_id,
                    },
                ));
            }
        }
        Ok(())
    }
    fn parse_dnstap_message_time<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        time_sec: u64,
        time_nsec: Option<u32>,
        dnstap_message_type_id: i32,
        message: Option<&Vec<u8>>,
        type_ids: &HashSet<i32>,
    ) {
        let (time_in_nanosec, query_time_nsec) = match time_nsec {
            Some(nsec) => (time_sec as i64 * 1_000_000_000_i64 + nsec as i64, nsec),
            None => (time_sec as i64 * 1_000_000_000_i64, 0),
        };
        if type_ids.contains(&dnstap_message_type_id) {
            DnstapParser::log_time(event, prefix.clone(), time_in_nanosec, "ns");
            let timestamp = Utc
                .timestamp_opt(time_sec.try_into().unwrap(), query_time_nsec)
                .single()
                .expect("invalid timestamp");
            if let Some(timestamp_key) = log_schema().timestamp_key() {
                DnstapParser::insert(event, prefix.clone(), timestamp_key, timestamp);
            }
        }
        if message.is_none() {
            DnstapParser::log_time(
                event,
                prefix.concat(&DNSTAP_VALUE_PATHS.request_message),
                time_in_nanosec,
                "ns",
            );
        }
    }
    fn parse_dnstap_message_socket_family<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        socket_family: i32,
        dnstap_message: &DnstapMessage,
    ) -> Result<()> {
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.socket_family,
            to_socket_family_name(socket_family)?.to_string(),
        );
        if let Some(socket_protocol) = dnstap_message.socket_protocol {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.socket_protocol,
                to_socket_protocol_name(socket_protocol)?.to_string(),
            );
        }
        if let Some(query_address) = dnstap_message.query_address.as_ref() {
            let source_address = if socket_family == 1 {
                let address_buffer: [u8; 4] = query_address[0..4].try_into()?;
                IpAddr::V4(Ipv4Addr::from(address_buffer))
            } else {
                let address_buffer: [u8; 16] = query_address[0..16].try_into()?;
                IpAddr::V6(Ipv6Addr::from(address_buffer))
            };
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.query_address,
                source_address.to_string(),
            );
        }
        if let Some(query_port) = dnstap_message.query_port {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.query_port,
                query_port,
            );
        }
        if let Some(response_address) = dnstap_message.response_address.as_ref() {
            let response_addr = if socket_family == 1 {
                let address_buffer: [u8; 4] = response_address[0..4].try_into()?;
                IpAddr::V4(Ipv4Addr::from(address_buffer))
            } else {
                let address_buffer: [u8; 16] = response_address[0..16].try_into()?;
                IpAddr::V6(Ipv6Addr::from(address_buffer))
            };
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.response_address,
                response_addr.to_string(),
            );
        }
        Ok(if let Some(response_port) = dnstap_message.response_port {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.response_port,
                response_port,
            );
        })
    }
    fn log_time<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        time: i64,
        time_precision: &str,
    ) {
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.time, time);
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.time_precision,
            time_precision.to_string(),
        );
    }
    fn log_raw_dns_message<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        raw_dns_message: &[u8],
    ) {
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.raw_data,
            BASE64_STANDARD.encode(raw_dns_message),
        );
    }
    fn parse_dns_query_message<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        dns_message_parser: &mut DnsMessageParser,
    ) -> Result<()> {
        let msg = dns_message_parser.parse_as_query_message()?;
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.response_code,
            msg.response_code,
        );
        if let Some(response) = msg.response {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.response,
                response.to_string(),
            );
        }
        DnstapParser::log_dns_query_message_header(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.header),
            &msg.header,
        );
        DnstapParser::log_dns_query_message_query_section(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.question_section),
            &msg.question_section,
        );
        DnstapParser::log_dns_message_record_section(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.answer_section),
            &msg.answer_section,
        );
        DnstapParser::log_dns_message_record_section(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.authority_section),
            &msg.authority_section,
        );
        DnstapParser::log_dns_message_record_section(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.additional_section),
            &msg.additional_section,
        );
        DnstapParser::log_edns(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.opt_pseudo_section),
            &msg.opt_pseudo_section,
        );
        Ok(())
    }
    fn log_dns_query_message_header<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        header: &QueryHeader,
    ) {
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.id, header.id);
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.opcode,
            header.opcode,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.rcode,
            u16::from(header.rcode),
        );
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.qr, header.qr);
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.aa, header.aa);
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.tc, header.tc);
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.rd, header.rd);
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.ra, header.ra);
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.ad, header.ad);
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.cd, header.cd);
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.question_count,
            header.question_count,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.answer_count,
            header.answer_count,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.authority_count,
            header.authority_count,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.ar_count,
            header.additional_count,
        );
    }
    fn log_dns_query_message_query_section<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        questions: &[QueryQuestion],
    ) {
        for (i, query) in questions.iter().enumerate() {
            let index_segment = path!(i as isize);
            DnstapParser::log_dns_query_question(event, prefix.concat(index_segment), query);
        }
    }
    fn log_dns_query_question<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        question: &QueryQuestion,
    ) {
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.domain_name,
            question.name.clone(),
        );
        if let Some(record_type) = question.record_type.clone() {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.question_type,
                record_type,
            );
        }
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.question_type_id,
            question.record_type_id,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.class,
            question.class.clone(),
        );
    }
    fn parse_dns_update_message<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        dns_message_parser: &mut DnsMessageParser,
    ) -> Result<()> {
        let msg = dns_message_parser.parse_as_update_message()?;
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.response_code,
            msg.response_code,
        );
        if let Some(response) = msg.response {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.response,
                response.to_string(),
            );
        }
        DnstapParser::log_dns_update_message_header(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.header),
            &msg.header,
        );
        DnstapParser::log_dns_update_message_zone_section(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.zone_section),
            &msg.zone_to_update,
        );
        DnstapParser::log_dns_message_record_section(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.prerequisite_section),
            &msg.prerequisite_section,
        );
        DnstapParser::log_dns_message_record_section(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.update_section),
            &msg.update_section,
        );
        DnstapParser::log_dns_message_record_section(
            event,
            prefix.concat(&DNSTAP_VALUE_PATHS.additional_section),
            &msg.additional_section,
        );
        Ok(())
    }
    fn log_dns_update_message_header<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        header: &UpdateHeader,
    ) {
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.id, header.id);
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.opcode,
            header.opcode,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.rcode,
            u16::from(header.rcode),
        );
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.qr, header.qr);
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.zone_count,
            header.zone_count,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.prerequisite_count,
            header.prerequisite_count,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.update_count,
            header.update_count,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.ad_count,
            header.additional_count,
        );
    }
    fn log_dns_update_message_zone_section<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        zone: &ZoneInfo,
    ) {
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.zone_name,
            zone.name.clone(),
        );
        if let Some(zone_type) = zone.zone_type.clone() {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.zone_type,
                zone_type,
            );
        }
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.zone_type_id,
            zone.zone_type_id,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.zone_class,
            zone.class.clone(),
        );
    }
    fn log_edns<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        opt_section: &Option<OptPseudoSection>,
    ) {
        if let Some(edns) = opt_section {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.extended_rcode,
                edns.extended_rcode,
            );
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.version,
                edns.version,
            );
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.do_flag,
                edns.dnssec_ok,
            );
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.udp_max_payload_size,
                edns.udp_max_payload_size,
            );
            DnstapParser::log_edns_ede(event, prefix.concat(&DNSTAP_VALUE_PATHS.ede), &edns.ede);
            DnstapParser::log_edns_options(
                event,
                prefix.concat(&DNSTAP_VALUE_PATHS.options),
                &edns.options,
            );
        }
    }
    fn log_edns_ede<'a>(event: &mut LogEvent, prefix: impl ValuePath<'a>, options: &[EDE]) {
        options.iter().enumerate().for_each(|(i, entry)| {
            let index_segment = path!(i as isize);
            DnstapParser::log_edns_ede_entry(event, prefix.concat(index_segment), entry);
        });
    }
    fn log_edns_ede_entry<'a>(event: &mut LogEvent, prefix: impl ValuePath<'a>, entry: &EDE) {
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.info_code,
            entry.info_code(),
        );
        if let Some(purpose) = entry.purpose() {
            DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.purpose, purpose);
        }
        if let Some(extra_text) = entry.extra_text() {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.extra_text,
                extra_text,
            );
        }
    }
    fn log_edns_options<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        options: &[EdnsOptionEntry],
    ) {
        options.iter().enumerate().for_each(|(i, opt)| {
            let index_segment = path!(i as isize);
            DnstapParser::log_edns_opt(event, prefix.concat(index_segment), opt);
        });
    }
    fn log_edns_opt<'a>(event: &mut LogEvent, prefix: impl ValuePath<'a>, opt: &EdnsOptionEntry) {
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.opt_code,
            opt.opt_code,
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.opt_name,
            opt.opt_name.clone(),
        );
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.opt_data,
            opt.opt_data.clone(),
        );
    }
    fn log_dns_message_record_section<'a>(
        event: &mut LogEvent,
        prefix: impl ValuePath<'a>,
        records: &[DnsRecord],
    ) {
        for (i, record) in records.iter().enumerate() {
            let index_segment = path!(i as isize);
            DnstapParser::log_dns_record(event, prefix.concat(index_segment), record);
        }
    }
    fn log_dns_record<'a>(event: &mut LogEvent, prefix: impl ValuePath<'a>, record: &DnsRecord) {
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.domain_name,
            record.name.clone(),
        );
        if let Some(record_type) = record.record_type.clone() {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.record_type,
                record_type,
            );
        }
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.record_type_id,
            record.record_type_id,
        );
        DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.ttl, record.ttl);
        DnstapParser::insert(
            event,
            prefix.clone(),
            &DNSTAP_VALUE_PATHS.class,
            record.class.clone(),
        );
        if let Some(rdata) = &record.rdata {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.rdata,
                rdata.to_string(),
            );
        };
        if let Some(rdata_bytes) = &record.rdata_bytes {
            DnstapParser::insert(
                event,
                prefix.clone(),
                &DNSTAP_VALUE_PATHS.rdata_bytes,
                BASE64_STANDARD.encode(rdata_bytes),
            );
        };
    }
}
fn to_socket_family_name(socket_family: i32) -> Result<&'static str> {
    if socket_family == SocketFamily::Inet as i32 {
        Ok("INET")
    } else if socket_family == SocketFamily::Inet6 as i32 {
        Ok("INET6")
    } else {
        Err(Error::from(format!(
            "Unknown socket family: {}",
            socket_family
        )))
    }
}
fn to_socket_protocol_name(socket_protocol: i32) -> Result<&'static str> {
    if socket_protocol == SocketProtocol::Udp as i32 {
        Ok("UDP")
    } else if socket_protocol == SocketProtocol::Tcp as i32 {
        Ok("TCP")
    } else if socket_protocol == SocketProtocol::Dot as i32 {
        Ok("DOT")
    } else if socket_protocol == SocketProtocol::Doh as i32 {
        Ok("DOH")
    } else if socket_protocol == SocketProtocol::DnsCryptUdp as i32 {
        Ok("DNSCryptUDP")
    } else if socket_protocol == SocketProtocol::DnsCryptTcp as i32 {
        Ok("DNSCryptTCP")
    } else {
        Err(Error::from(format!(
            "Unknown socket protocol: {}",
            socket_protocol
        )))
    }
}
fn to_dnstap_data_type(data_type_id: i32) -> Option<String> {
    match data_type_id {
        1 => Some(String::from("Message")),
        _ => None,
    }
}
fn to_dnstap_message_type(type_id: i32) -> String {
    match type_id {
        1 => String::from("AuthQuery"),
        2 => String::from("AuthResponse"),
        3 => String::from("ResolverQuery"),
        4 => String::from("ResolverResponse"),
        5 => String::from("ClientQuery"),
        6 => String::from("ClientResponse"),
        7 => String::from("ForwarderQuery"),
        8 => String::from("ForwarderResponse"),
        9 => String::from("StubQuery"),
        10 => String::from("StubResponse"),
        11 => String::from("ToolQuery"),
        12 => String::from("ToolResponse"),
        13 => String::from("UpdateQuery"),
        14 => String::from("UpdateResponse"),
        _ => format!("Unknown dnstap message type: {}", type_id),
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use crate::event::Value;
    use chrono::DateTime;
    use dnsmsg_parser::dns_message_parser::DnsParserOptions;
    use std::collections::BTreeMap;
    #[test]
    fn test_parse_dnstap_data_with_query_message() {
        let mut log_event = LogEvent::default();
        let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcnoIAxACGAEiEAAAAAAAAA\
        AAAAAAAAAAAAAqECABBQJwlAAAAAAAAAAAADAw8+0CODVA7+zq9wVNMU3WNlI2kwIAAAABAAAAAAABCWZhY2Vib29rMQNjb\
        20AAAEAAQAAKQIAAACAAAAMAAoACOxjCAG9zVgzWgUDY29tAHgB";
        let dnstap_data = BASE64_STANDARD
            .decode(raw_dnstap_data)
            .expect("Invalid base64 encoded data.");
        let parse_result = DnstapParser::parse(
            &mut log_event,
            Bytes::from(dnstap_data),
            DnsParserOptions::default(),
        );
        assert!(parse_result.is_ok());
        let expected_map: BTreeMap<&str, Value> = BTreeMap::from([
            ("dataType", Value::Bytes(Bytes::from("Message"))),
            ("dataTypeId", Value::Integer(1)),
            ("messageType", Value::Bytes(Bytes::from("ResolverQuery"))),
            ("messageTypeId", Value::Integer(3)),
            ("queryZone", Value::Bytes(Bytes::from("com."))),
            ("requestData.fullRcode", Value::Integer(0)),
            ("requestData.header.aa", Value::Boolean(false)),
            ("requestData.header.ad", Value::Boolean(false)),
            ("requestData.header.anCount", Value::Integer(0)),
            ("requestData.header.arCount", Value::Integer(1)),
            ("requestData.header.cd", Value::Boolean(false)),
            ("requestData.header.id", Value::Integer(37634)),
            ("requestData.header.nsCount", Value::Integer(0)),
            ("requestData.header.opcode", Value::Integer(0)),
            ("requestData.header.qdCount", Value::Integer(1)),
            ("requestData.header.qr", Value::Integer(0)),
            ("requestData.header.ra", Value::Boolean(false)),
            ("requestData.header.rcode", Value::Integer(0)),
            ("requestData.header.rd", Value::Boolean(false)),
            ("requestData.header.tc", Value::Boolean(false)),
            ("requestData.opt.do", Value::Boolean(true)),
            ("requestData.opt.ednsVersion", Value::Integer(0)),
            ("requestData.opt.extendedRcode", Value::Integer(0)),
            ("requestData.opt.options[0].optCode", Value::Integer(10)),
            (
                "requestData.opt.options[0].optName",
                Value::Bytes(Bytes::from("Cookie")),
            ),
            (
                "requestData.opt.options[0].optValue",
                Value::Bytes(Bytes::from("7GMIAb3NWDM=")),
            ),
            ("requestData.opt.udpPayloadSize", Value::Integer(512)),
            (
                "requestData.question[0].class",
                Value::Bytes(Bytes::from("IN")),
            ),
            (
                "requestData.question[0].domainName",
                Value::Bytes(Bytes::from("facebook1.com.")),
            ),
            (
                "requestData.question[0].questionType",
                Value::Bytes(Bytes::from("A")),
            ),
            ("requestData.question[0].questionTypeId", Value::Integer(1)),
            (
                "requestData.rcodeName",
                Value::Bytes(Bytes::from("NoError")),
            ),
            (
                "responseAddress",
                Value::Bytes(Bytes::from("2001:502:7094::30")),
            ),
            ("responsePort", Value::Integer(53)),
            (
                "serverId",
                Value::Bytes(Bytes::from("james-Virtual-Machine")),
            ),
            ("serverVersion", Value::Bytes(Bytes::from("BIND 9.16.3"))),
            ("socketFamily", Value::Bytes(Bytes::from("INET6"))),
            ("socketProtocol", Value::Bytes(Bytes::from("UDP"))),
            ("sourceAddress", Value::Bytes(Bytes::from("::"))),
            ("sourcePort", Value::Integer(46835)),
            ("time", Value::Integer(1_593_489_007_920_014_129)),
            ("timePrecision", Value::Bytes(Bytes::from("ns"))),
            (
                "timestamp",
                Value::Timestamp(
                    Utc.from_utc_datetime(
                        &DateTime::parse_from_rfc3339("2020-06-30T03:50:07.920014129Z")
                            .unwrap()
                            .naive_utc(),
                    ),
                ),
            ),
        ]);
        for (exp_key, exp_value) in expected_map {
            let value = log_event.get(exp_key).unwrap();
            assert_eq!(*value, exp_value);
        }
    }
    #[test]
    fn test_parse_dnstap_data_lowercase_hostnames() {
        let mut log_event = LogEvent::default();
        let mut lowercase_log_event = LogEvent::default();
        let raw_dnstap_data = "Cgw2NzNiNWZiZWI5MmESMkJJTkQgOS4xOC4yMS0xK3VidW50dTIyLjA0LjErZGViLnN1cnkub3JnKzEtVWJ1bnR1cqkBCAYQARgBIgQKWQUeKgQKWQUqMMitAjg1YLXQp68GbZ9tBw9ygwGInoGAAAEABAAAAAEGVmVjdG9yA0RldgAAAQABwAwAAQABAAAAPAAEEvVWOMAMAAEAAQAAADwABBL1VnnADAABAAEAAAA8AAQS9VYSwAwAAQABAAAAPAAEEvVWWQAAKQTQAAAAAAAcAAoAGERDbSN8uKngAQAAAGXp6DXs0fbpv0n9F3gB";
        let dnstap_data = BASE64_STANDARD
            .decode(raw_dnstap_data)
            .expect("Invalid base64 encoded data.");
        let parse_result = DnstapParser::parse(
            &mut lowercase_log_event,
            Bytes::from(dnstap_data.clone()),
            DnsParserOptions {
                lowercase_hostnames: true,
            },
        );
        let no_lowercase_result = DnstapParser::parse(
            &mut log_event,
            Bytes::from(dnstap_data),
            DnsParserOptions::default(),
        );
        assert!(parse_result.is_ok());
        assert!(no_lowercase_result.is_ok());
        let no_lowercase_expected: BTreeMap<&str, Value> = BTreeMap::from([
            ("dataType", Value::Bytes(Bytes::from("Message"))),
            ("dataTypeId", Value::Integer(1)),
            (
                "responseData.answers[0].domainName",
                Value::Bytes(Bytes::from("Vector.Dev.")),
            ),
            (
                "responseData.question[0].domainName",
                Value::Bytes(Bytes::from("Vector.Dev.")),
            ),
        ]);
        let expected_map: BTreeMap<&str, Value> = BTreeMap::from([
            ("dataType", Value::Bytes(Bytes::from("Message"))),
            ("dataTypeId", Value::Integer(1)),
            (
                "responseData.answers[0].domainName",
                Value::Bytes(Bytes::from("vector.dev.")),
            ),
            (
                "responseData.question[0].domainName",
                Value::Bytes(Bytes::from("vector.dev.")),
            ),
        ]);
        for (exp_key, exp_value) in no_lowercase_expected {
            let value = log_event.get(exp_key).unwrap();
            assert_eq!(*value, exp_value);
        }
        for (exp_key, exp_value) in expected_map {
            let value = lowercase_log_event.get(exp_key).unwrap();
            assert_eq!(*value, exp_value);
        }
    }
    #[test]
    fn test_parse_dnstap_data_with_ede_options() {
        let mut log_event = LogEvent::default();
        let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zGgBy5wEIAxACGAEiEAAAAAAAAAAAAAAAAAAAAAAqECABBQJwlAAAAAAAAAAAADAw8+0CODVA7+zq9wVNMU3WNlI2kwIAAAABAAAAAAABCWZhY2Vib29rMQNjb20AAAEAAQAAKQIAAACAAAAMAAoACOxjCAG9zVgzWgUDY29tAGAAbQAAAAByZLM4AAAAAQAAAAAAAQJoNQdleGFtcGxlA2NvbQAABgABAAApBNABAUAAADkADwA1AAlubyBTRVAgbWF0Y2hpbmcgdGhlIERTIGZvdW5kIGZvciBkbnNzZWMtZmFpbGVkLm9yZy54AQ==";
        let dnstap_data = BASE64_STANDARD
            .decode(raw_dnstap_data)
            .expect("Invalid base64 encoded data.");
        let parse_result = DnstapParser::parse(
            &mut log_event,
            Bytes::from(dnstap_data),
            DnsParserOptions::default(),
        );
        assert!(parse_result.is_ok());
        let expected_map: BTreeMap<&str, Value> = BTreeMap::from([
            ("responseData.opt.ede[0].infoCode", Value::Integer(9)),
            (
                "responseData.opt.ede[0].purpose",
                Value::Bytes(Bytes::from("DNSKEY Missing")),
            ),
            (
                "responseData.opt.ede[0].extraText",
                Value::Bytes(Bytes::from(
                    "no SEP matching the DS found for dnssec-failed.org.",
                )),
            ),
        ]);
        for (exp_key, exp_value) in expected_map {
            let value = log_event.get(exp_key).unwrap();
            assert_eq!(*value, exp_value);
        }
    }
    #[test]
    fn test_parse_dnstap_data_with_update_message() {
        let mut log_event = LogEvent::default();
        let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcmsIDhABGAEiBH8AAA\
        EqBH8AAAEwrG44AEC+iu73BU14gfofUh1wi6gAAAEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAWC+iu73BW0agDwvch1wi6gAA\
        AEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAXgB";
        let dnstap_data = BASE64_STANDARD
            .decode(raw_dnstap_data)
            .expect("Invalid base64 encoded data.");
        let parse_result = DnstapParser::parse(
            &mut log_event,
            Bytes::from(dnstap_data),
            DnsParserOptions::default(),
        );
        assert!(parse_result.is_ok());
        let expected_map: BTreeMap<&str, Value> = BTreeMap::from([
            ("dataType", Value::Bytes(Bytes::from("Message"))),
            ("dataTypeId", Value::Integer(1)),
            ("messageType", Value::Bytes(Bytes::from("UpdateResponse"))),
            ("messageTypeId", Value::Integer(14)),
            ("requestData.fullRcode", Value::Integer(0)),
            ("requestData.header.adCount", Value::Integer(0)),
            ("requestData.header.id", Value::Integer(28811)),
            ("requestData.header.opcode", Value::Integer(5)),
            ("requestData.header.prCount", Value::Integer(0)),
            ("requestData.header.qr", Value::Integer(1)),
            ("requestData.header.rcode", Value::Integer(0)),
            ("requestData.header.upCount", Value::Integer(0)),
            ("requestData.header.zoCount", Value::Integer(1)),
            (
                "requestData.rcodeName",
                Value::Bytes(Bytes::from("NoError")),
            ),
            ("requestData.zone.zClass", Value::Bytes(Bytes::from("IN"))),
            (
                "requestData.zone.zName",
                Value::Bytes(Bytes::from("example.com.")),
            ),
            ("requestData.zone.zType", Value::Bytes(Bytes::from("SOA"))),
            ("requestData.zone.zTypeId", Value::Integer(6)),
            ("responseAddress", Value::Bytes(Bytes::from("127.0.0.1"))),
            ("responseData.fullRcode", Value::Integer(0)),
            ("responseData.header.adCount", Value::Integer(0)),
            ("responseData.header.id", Value::Integer(28811)),
            ("responseData.header.opcode", Value::Integer(5)),
            ("responseData.header.prCount", Value::Integer(0)),
            ("responseData.header.qr", Value::Integer(1)),
            ("responseData.header.rcode", Value::Integer(0)),
            ("responseData.header.upCount", Value::Integer(0)),
            ("responseData.header.zoCount", Value::Integer(1)),
            (
                "responseData.rcodeName",
                Value::Bytes(Bytes::from("NoError")),
            ),
            ("responseData.zone.zClass", Value::Bytes(Bytes::from("IN"))),
            (
                "responseData.zone.zName",
                Value::Bytes(Bytes::from("example.com.")),
            ),
            ("responseData.zone.zType", Value::Bytes(Bytes::from("SOA"))),
            ("responseData.zone.zTypeId", Value::Integer(6)),
            ("responsePort", Value::Integer(0)),
            (
                "serverId",
                Value::Bytes(Bytes::from("james-Virtual-Machine")),
            ),
            ("serverVersion", Value::Bytes(Bytes::from("BIND 9.16.3"))),
            ("socketFamily", Value::Bytes(Bytes::from("INET"))),
            ("socketProtocol", Value::Bytes(Bytes::from("UDP"))),
            ("sourceAddress", Value::Bytes(Bytes::from("127.0.0.1"))),
            ("sourcePort", Value::Integer(14124)),
            ("time", Value::Integer(1_593_541_950_792_494_106)),
            ("timePrecision", Value::Bytes(Bytes::from("ns"))),
            (
                "timestamp",
                Value::Timestamp(
                    Utc.from_utc_datetime(
                        &DateTime::parse_from_rfc3339("2020-06-30T18:32:30.792494106Z")
                            .unwrap()
                            .naive_utc(),
                    ),
                ),
            ),
        ]);
        for (exp_key, exp_value) in expected_map {
            let value = log_event.get(exp_key).unwrap();
            assert_eq!(*value, exp_value);
        }
    }
    #[test]
    fn test_parse_dnstap_data_with_invalid_data() {
        let mut log_event = LogEvent::default();
        let e = DnstapParser::parse(
            &mut log_event,
            Bytes::from(vec![1, 2, 3]),
            DnsParserOptions::default(),
        )
        .expect_err("Expected TrustDnsError.");
        assert!(e.to_string().contains("Protobuf message"));
    }
    #[test]
    fn test_get_socket_family_name() {
        assert_eq!("INET", to_socket_family_name(1).unwrap());
        assert_eq!("INET6", to_socket_family_name(2).unwrap());
        assert!(to_socket_family_name(3).is_err());
    }
    #[test]
    fn test_get_socket_protocol_name() {
        assert_eq!("UDP", to_socket_protocol_name(1).unwrap());
        assert_eq!("TCP", to_socket_protocol_name(2).unwrap());
        assert_eq!("DOT", to_socket_protocol_name(3).unwrap());
        assert_eq!("DOH", to_socket_protocol_name(4).unwrap());
        assert_eq!("DNSCryptUDP", to_socket_protocol_name(5).unwrap());
        assert_eq!("DNSCryptTCP", to_socket_protocol_name(6).unwrap());
        assert!(to_socket_protocol_name(7).is_err());
    }
}