use bytes::{Buf, BufMut};
use enumflags2::{bitflags, BitFlags, FromBitsError};
use prost::Message;
use snafu::Snafu;
use vector_buffers::encoding::{AsMetadata, Encodable};
use super::{proto, Event, EventArray};
#[derive(Debug, Snafu)]
pub enum EncodeError {
#[snafu(display("the provided buffer was too small to fully encode this item"))]
BufferTooSmall,
}
#[derive(Debug, Snafu)]
pub enum DecodeError {
#[snafu(display(
"the provided buffer could not be decoded as a valid Protocol Buffers payload"
))]
InvalidProtobufPayload,
#[snafu(display("unsupported encoding metadata for this context"))]
UnsupportedEncodingMetadata,
}
#[bitflags]
#[repr(u32)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum EventEncodableMetadataFlags {
DiskBufferV1CompatibilityMode = 0b1,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct EventEncodableMetadata(BitFlags<EventEncodableMetadataFlags>);
impl EventEncodableMetadata {
fn contains(self, flag: EventEncodableMetadataFlags) -> bool {
self.0.contains(flag)
}
}
impl From<EventEncodableMetadataFlags> for EventEncodableMetadata {
fn from(flag: EventEncodableMetadataFlags) -> Self {
Self(BitFlags::from(flag))
}
}
impl From<BitFlags<EventEncodableMetadataFlags>> for EventEncodableMetadata {
fn from(flags: BitFlags<EventEncodableMetadataFlags>) -> Self {
Self(flags)
}
}
impl TryFrom<u32> for EventEncodableMetadata {
type Error = FromBitsError<EventEncodableMetadataFlags>;
fn try_from(value: u32) -> Result<Self, Self::Error> {
BitFlags::try_from(value).map(Self)
}
}
impl AsMetadata for EventEncodableMetadata {
fn into_u32(self) -> u32 {
self.0.bits()
}
fn from_u32(value: u32) -> Option<Self> {
EventEncodableMetadata::try_from(value).ok()
}
}
impl Encodable for EventArray {
type Metadata = EventEncodableMetadata;
type EncodeError = EncodeError;
type DecodeError = DecodeError;
fn get_metadata() -> Self::Metadata {
EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode.into()
}
fn can_decode(metadata: Self::Metadata) -> bool {
metadata.contains(EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode)
}
fn encode<B>(self, buffer: &mut B) -> Result<(), Self::EncodeError>
where
B: BufMut,
{
proto::EventArray::from(self)
.encode(buffer)
.map_err(|_| EncodeError::BufferTooSmall)
}
fn decode<B>(metadata: Self::Metadata, buffer: B) -> Result<Self, Self::DecodeError>
where
B: Buf + Clone,
{
if metadata.contains(EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode) {
proto::EventArray::decode(buffer.clone())
.map(Into::into)
.or_else(|_| {
proto::EventWrapper::decode(buffer)
.map(|pe| EventArray::from(Event::from(pe)))
.map_err(|_| DecodeError::InvalidProtobufPayload)
})
} else {
Err(DecodeError::UnsupportedEncodingMetadata)
}
}
}