Skip to main content

Video Streaming Output

Stream output is in the form of a serialized protobuf output of type quic_wrapper.

info

The maximum number of vehicles for which the video can be subscribed to in a single connection or request is 4. If you make a request with more than 4 vehicles, the connection will not go through and it will throw an error.

Example format:

syntax = "proto3";
message quic_wrapper {
enum Compression {
NONE = 0; ZSTD = 1;
}
int64 utime = 1;
bytes data = 2;
//mqtt_topic contains information about camera type. Example: if (mqtt_topic) contains "fc" then it is front center.
string mqtt_topic = 3;
Compression compression = 4;
}

data is of protobuf type May.image_t

Protobuf Decoding:

const quic_wrapper_deserialize = May.quic_wrapper.decode({ArrayOfStreamedData});   
const image = May.image_t.decode(quic_wrapper_deserialize.data);

Encoding Types:

image_t will contain the encoding type. Most of the data is in VP9 format.

  ENCODING_UNKNOWN = 0,
JPEG = 1,
VP8 = 2,
VP9 = 3

Frame Types:

image_t will contain the frame type information.

  TYPE_UNKNOWN = 0,
KEY = 1,
DELTA = 2

Sequencing Logic:

To improve the quality of video, you would need to sequence the frames coming from server in a order.

Below method decodingWithFrameSequencing has the logic for comparing the frames and ordering it in a sequence.

interface ImageCache {
[key: number]: May.image_t;
}

cache: ImageCache;
sequence: number;
frameNumber: number;

displayDecode(image: May.image_t): void {
const newFrameNumber = Number(image.timestamp);
const newSequenceNumber = Number(image.sequence);

if (newFrameNumber === 0 && newSequenceNumber === 0) {
return;
else {
this.decodingWithFrameSequencing(image, newSequenceNumber);
}
}

decodingWithFrameSequencing(image: May.image_t, newSequenceNumber: number) {
const frameType = getFrameType(image);
const cacheSize = Object.keys(this.cache).length;
// If this is the first key frame, just decode it.
if (this.sequence === undefined && frameType === 'key') {
this._decodeAndSetTimestampAndSequence(image, frameType);
} else if (this.sequence !== undefined) {
const isFutureFrame = newSequenceNumber > this.sequence;
const isFutureIFrame = isFutureFrame && frameType === 'key';
const isNextFrame = newSequenceNumber === this.sequence + 1;
const nextFrameAlreadyCached = this.cache[this.sequence + 1];

if (isFutureIFrame || isNextFrame) {
this._decodeAndSetTimestampAndSequence(image, frameType);
this.playQueuedFollowUpFrames();
this.pruneOlderFramesFromBuffer(this.sequence);
} else {
if (nextFrameAlreadyCached) {
this.playQueuedFollowUpFrames();
this.pruneOlderFramesFromBuffer(this.sequence);
}
if (isFutureFrame) {
this.cache[newSequenceNumber] = image;
if (cacheSize + 1 > MAX_BUFFER_SIZE) {
this.fastForwardFramesAndThenPruneBuffer();
}
}
}
}
}

_decodeAndSetTimestampAndSequence(
image: May.image_t,
frameType: EncodedVideoChunkType
) {
const timestamp = Number(image.timestamp);
const sequence = Number(image.sequence);
this._decode(image, frameType, Number(image.utime), sequence);
this.sequence = sequence > 0 ? sequence : undefined;
this.frameNumber = timestamp;
}

_decode(
image: May.image_t,
frameType: EncodedVideoChunkType,
timestamp: number,
_sequence: number
) {
this.videoDecoder.decode(
new EncodedVideoChunk({
data: image.data,
timestamp,
type: frameType
})
);
}

/**
* Decodes buffered frames as long as the sequences are ordered.
*/
fastForwardFramesAndThenPruneBuffer() {
const sortedFrames = Object.keys(this.cache)
.map((key) => Number(key))
.sort();
let nextSequence: number | undefined = undefined;
let shouldSkip = false;
sortedFrames.forEach((sequence: number, index: number) => {
const image = this.cache[sequence];
if (index === 0) {
nextSequence = sequence;
} else if (sequence === sortedFrames[index - 1] + 1) {
nextSequence = sequence;
} else if (
this.sequence &&
sequence > this.sequence &&
image?.frameType === May.image_t.FrameType.KEY
) {
nextSequence = sequence;
} else {
shouldSkip = true;
nextSequence = undefined;
}

if (nextSequence !== undefined && !shouldSkip) {
const nextImage = this.cache[nextSequence];
this._decodeAndSetTimestampAndSequence(
nextImage,
getFrameType(nextImage)
);
delete this.cache[nextSequence];
} else if (this.sequence && sequence < this.sequence) {
delete this.cache[sequence];
}
});
}

playQueuedFollowUpFrames() {
const sortedFrames = Object.keys(this.cache)
.map((key) => Number(key))
.sort();
if (!this.sequence || sortedFrames.length === 0) {
return;
}
for (let index = 0; index < sortedFrames.length; ++index) {
const currentSequence = sortedFrames[index];
// this.sequence will be updated when a frame is decoded
const nextSequence = this.sequence! + 1;
if (currentSequence < nextSequence) {
continue;
} else if (currentSequence === nextSequence) {
const nextFrame = this.cache[currentSequence];
if (nextFrame) {
this._decodeAndSetTimestampAndSequence(
nextFrame,
getFrameType(nextFrame)
);
}
} else {
break;
}
}
}

pruneOlderFramesFromBuffer(min_sequence: number) {
Object.keys(this.cache).forEach((sequence) => {
const sequenceNumber = parseInt(sequence);
if (sequenceNumber < min_sequence) {
delete this.cache[sequenceNumber];
}
});
}


function getFrameType(image: May.image_t): EncodedVideoChunkType {
return image.frameType === May.image_t.FrameType.KEY ? 'key' : 'delta';
}

More information on deserialization, including protobuf packages specific to any programming language being used and other key notes can be found here.