一.前言
FFplay是著名的音视频开源项目FFmpeg自带的支持多协议播放的命令行播放器,我们熟悉的手机端播放器内核ijkplayer也是基于ffplay开发。QUIC最早是由google推出的基于UDP的可靠传输协议,目前已经被IETF接纳,并基于其上推出了正式标准RFC9000。QUIC是HTTP3协议的底层传输协议。QuicPro是京东自研的QUIC实现,遵循RFC9000标准,目前已经集成到京东APP、京喜APP等产品线,并稳定运行。
QUIC协议基于带宽瓶颈预测及环路延时预测的传输控制算法,相比TCP的基于网络拥塞的控制算法,在抗网络抖动、提高网络传输速率方面有显著优势。根据QuicPro在线上Android应用内的直播场景中的使用情况统计数据显示,直播卡顿率较TCP由1.43%降为1.14%,降幅为20%,首开时间较TCP的949毫秒降为659毫秒,降幅为30.5%,如下图所示:
图1.TCP与QUIC分别在直播卡顿率及首开时间上的对比
本文将介绍如何将QuicPro协议栈实现集成进ffplay及ijkplayer这类基于FFmpeg框架的播放器。
二.FFplay架构及工作流程
FFplay是一个非常强大而又精简的播放器,除了支持多种文件格式、网络协议及输入源外,没有多余的界面及功能。ffplay内部架构及功能模块涉及数据流读取、解封装、视频解码、音频解码、视频渲染、音频播放、音视频同步等,ffplay使用SDL进行音频播放及视频图像渲染。下图是ffplay内部线程设置及工作流程图:
图2.FFplay内部线程及工作流程
如上图所示,ffplay内部共创建5个线程,具体功能为:
- 主线程:视频渲染播放/用户事件及消息处理,以及音视频同步逻辑;
- 解复用线程(读取线程):从本地文件、网络连接及输入设备中读取数据流,并调用数据流解析器对数据格式进行判断并解析,之后将数据包推入相应的队列;
- 视频解码线程:创建视频解码器,从video packets队列缓冲区读取视频数据包,解码后将视频帧推入 video frame队列,供渲染线程使用;
- 音频解码线程:创建音频解码器,从audio packets队列缓冲区读取音频数据包,解码后将音频帧推入audio frame队列,供音频播放线程使用;
- 音频播放线程:调用音频设备驱动接口,创建音频播放器实例,并将音频数据从audio frame中取出播放;
由于ffplay当前还不支持QUIC传输协议,我们需要扩展ffplay以自定义网络协议的方式支持QUIC传输协议。FFplay通过FFmpeg内部精心设计的插件架构来支持多种输入协议及格式。输入的数据流一般分为串流和结构数据,文件流、网络数据流即属于串流,这类流是遵循一定格式规范的经过序列化处理的二进制数据流。每一种串流都需要自己独一无二的scheme来标识自己,并遵循ffmpeg协议结构来实现相应的open/read/write/seek/close回调函数,解复用线程通过调用avformat_open_input识别串流协议格式,并调用相应的串流数据操作接口。下图介绍了串流协议的注册与使用流程:
图3.串流数据输入协议的注册与使用流程
三.FFplay集成QuicPro详解
1.QuicPro动态库载入
为方便业务方集成,减少编译依赖,QuicPro协议插件设计成可动态加载的方式,下述代码支持windows/Android/iOS等平台动态加载QuicPro相应的动态库。
#include "avformat.h"
#include "network.h"
#include "url.h"
#include "libavutil/opt.h"
#include "libavutil/avassert.h"
#include "libavutil/thread.h"
#include "libavutil/fifo.h"
#include "libavutil/log.h"
#include <stdatomic.h>
// A set of macros to use for platform detection.
#if defined(__ANDROID_API__)
#define OS_ANDROID 1
#elif defined(__APPLE__)
// only include TargetConditions after testing ANDROID as some android builds
// on mac don't have this header available and it's not needed unless the target
// is really mac/ios.
#include <TargetConditionals.h>
#define OS_MACOSX 1
#if defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE
#define OS_IOS 1
#endif // defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE
#elif defined(__linux__)
#define OS_LINUX 1
// include a system header to pull in features.h for glibc/uclibc macros.
#include <unistd.h>
#if defined(__GLIBC__) && !defined(__UCLIBC__)
// we really are using glibc, not uClibc pretending to be glibc
#define LIBC_GLIBC 1
#endif
#elif defined(_WIN32)
#define OS_WIN 1
#define TOOLKIT_VIEWS 1
#elif defined(__FreeBSD__)
#define OS_FREEBSD 1
#elif defined(__NetBSD__)
#define OS_NETBSD 1
#elif defined(__OpenBSD__)
#define OS_OPENBSD 1
#elif defined(__sun)
#define OS_SOLARIS 1
#elif defined(__QNXNTO__)
#define OS_QNX 1
#else
#error Please add support for your platform in build/build_config.h
#endif
#undef MIN
#define MIN(l,o) ((l) < (o) ? (l) : (o))
#undef MAX
#define MAX(h,i) ((h) > (i) ? (h) : (i))
#ifdef _MSC_VER
#pragma warning(disable:4828)
#define _Atomic
#endif // _MSC_VER
#include "quicpro.h"
typedef
QPRESULT
(*QUICPRO_INITIALIZE_FN)(int log_level, int quic_version);
typedef
QPRESULT
(*QUICPRO_SET_LOG_CALLBACK_FN)(quicpro_log_callback callback, void* ctx);
typedef
quicpro_request_t*
(*QUICPRO_CREATE_REQUEST_FN)(
quicpro_send_callback send_cb,
quicpro_response_header_callback resp_header_cb,
quicpro_response_body_callback resp_body_cb,
quicpro_response_finish_callback resp_finish_cb,
quicpro_close_callback close_cb,
void* ctx);
typedef
QPRESULT
(*QUICPRO_REQUEST_SET_METHOD_FN)(quicpro_request_t* req, const char* method);
typedef
QPRESULT
(*QUICPRO_REQUEST_SET_SCHEME_FN)(quicpro_request_t* req, const char* scheme);
typedef
QPRESULT
(*QUICPRO_REQUEST_SET_HOSTNAME_FN)(quicpro_request_t* req, const char* hostname);
typedef
QPRESULT
(*QUICPRO_REQUEST_SET_SERVICE_PORT_FN)(quicpro_request_t* req, const char* svc_port);
typedef
QPRESULT
(*QUICPRO_REQUEST_SET_PATH_FN)(quicpro_request_t* req, const char* path);
typedef
QPRESULT
(*QUICPRO_REQUEST_SET_CONNECT_TIMEOUT_FN)(quicpro_request_t* req, int milliseconds);
typedef
QPRESULT
(*QUICPRO_REQUEST_SEND_FN)(quicpro_request_t* request);
typedef
QPRESULT
(*QUICPRO_REQUEST_CLOSE_FN)(quicpro_request_t** request);
typedef
QPRESULT
(*QUICPRO_FREE_BUFFER_FN)(void* buffer);
#ifdef OS_ANDROID
int get_so_full_path(
const char* self_name,
const char* dest_name,
char* out_buffer,
int out_buff_len) {
int ret = 0;
const size_t SELF_NAME_LEN = strlen(self_name);
char linebuf[512];
FILE* fmap = fopen("/proc/self/maps", "r");
if (!fmap) {
ret = -1;
goto return_result;
}
memset(out_buffer, 0, out_buff_len);
while (fgets(linebuf, sizeof(linebuf), fmap)) {
uintptr_t begin, end;
char perm[10], offset[20], dev[10], inode[20], path_mem[256], * path;
int nr = sscanf(linebuf, "%zx-%zx %s %s %s %s %s", &begin, &end, perm,
offset, dev, inode, path_mem);
if (nr == 6) {
path = NULL;
}
else {
if (nr != 7) {
ret = -2;
goto close_file;
}
path = path_mem;
}
if (path) {
size_t len = strlen(path);
char * last_dir_end = path + len - SELF_NAME_LEN;
if (!strcmp(last_dir_end, self_name)) {
int path_len = strlen(path);
last_dir_end[1] = 0;
strncpy(out_buffer, path, MIN(path_len, out_buff_len));
if (path_len < out_buff_len) {
out_buffer[path_len] = 0;
strcat(out_buffer, dest_name);
}
goto close_file;
}
}
}
close_file:
fclose(fmap);
return_result:
return ret;
}
#endif // OS_ANDROID
/* Obviously we can't use SDL_LoadObject() to load SDL. :) */
/* Also obviously, we never close the loaded library. */
#if defined(WIN32) || defined(_WIN32) || defined(__CYGWIN__)
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN 1
#endif
#include <windows.h>
static void* get_quicpro_func(const char* fname, const char* sym)
{
HANDLE lib = LoadLibraryA(fname);
void* retval = NULL;
if (lib) {
retval = GetProcAddress(lib, sym);
if (retval == NULL) {
FreeLibrary(lib);
}
}
return retval;
}
#define DL_NAME "quicpro.dll"
#elif defined(OS_LINUX) || defined(OS_ANDROID) || defined(OS_MACOSX)
#include <dlfcn.h>
static void* get_quicpro_func(const char* fname, const char* sym)
{
#ifdef OS_ANDROID
char path[512];
void* lib = NULL;
{
int ret = get_so_full_path("/libijkffmpeg.so", fname, path, sizeof(path));
if (ret >= 0)
{
lib = dlopen(path, RTLD_NOW | RTLD_LOCAL);
}
}
#else // !OS_ANDROID
void* lib = dlopen(fname, RTLD_NOW | RTLD_LOCAL);
#endif // OS_ANDROID
void* retval = NULL;
if (lib != NULL) {
retval = dlsym(lib, sym);
if (retval == NULL) {
dlclose(lib);
}
}
return retval;
}
#if defined(OS_ANDROID) || defined(OS_LINUX)
#define DL_NAME "libquicpro.so"
#elif defined(OS_MACOSX)
#define DL_NAME "libquicpro.dylib"
#endif // OS_ANDROID || OS_LINUX
#else
#error Please define your platform.
#endif
static QUICPRO_INITIALIZE_FN quicpro_initialize_fn = NULL;
static QUICPRO_SET_LOG_CALLBACK_FN quicpro_set_log_callback_fn = NULL;
static QUICPRO_CREATE_REQUEST_FN quicpro_create_request_fn = NULL;
static QUICPRO_REQUEST_SET_METHOD_FN quicpro_request_set_method_fn = NULL;
static QUICPRO_REQUEST_SET_SCHEME_FN quicpro_request_set_scheme_fn = NULL;
static QUICPRO_REQUEST_SET_HOSTNAME_FN quicpro_request_set_hostname_fn = NULL;
static QUICPRO_REQUEST_SET_SERVICE_PORT_FN quicpro_request_set_service_port_fn = NULL;
static QUICPRO_REQUEST_SET_PATH_FN quicpro_request_set_path_fn = NULL;
static QUICPRO_REQUEST_SET_CONNECT_TIMEOUT_FN quicpro_request_set_connect_timeout_fn = NULL;
static QUICPRO_REQUEST_SEND_FN quicpro_request_send_fn = NULL;
static QUICPRO_REQUEST_CLOSE_FN quicpro_request_close_fn = NULL;
static QUICPRO_FREE_BUFFER_FN quicpro_free_buffer_fn = NULL;
// Get quicpro function pointers
static int get_quicpro_functions()
{
#define LOAD_FUNC(func_name) \
if (func_name##_fn == NULL) \
func_name##_fn = get_quicpro_func(DL_NAME, #func_name); \
if (func_name##_fn == NULL) \
return -1;
LOAD_FUNC(quicpro_initialize);
LOAD_FUNC(quicpro_initialize);
LOAD_FUNC(quicpro_set_log_callback);
LOAD_FUNC(quicpro_create_request);
LOAD_FUNC(quicpro_request_set_method);
LOAD_FUNC(quicpro_request_set_scheme);
LOAD_FUNC(quicpro_request_set_hostname);
LOAD_FUNC(quicpro_request_set_service_port);
LOAD_FUNC(quicpro_request_set_path);
LOAD_FUNC(quicpro_request_set_connect_timeout);
LOAD_FUNC(quicpro_request_send);
LOAD_FUNC(quicpro_request_close);
LOAD_FUNC(quicpro_free_buffer);
#undef LOAD_FUNC
return 0;
}
2.多线程模型及FIFO队列
QuicPro动态库有自己独立的I/O线程,ffplay也有自己独立的串流读取线程,要实现quic串流协议,需要进行多线程操作,我们基于ffmpeg原生的AVFiFoBuffer设计了RingBuffer结构,用于QuicPro和ffplay之间的数据传输。
图4.通过FIFO队列实现线程间数据传输
#define BUFFER_CAPACITY (4 * 1024 * 1024)
#define READ_BACK_CAPACITY (4 * 1024 * 1024)
#define SHORT_SEEK_THRESHOLD (256 * 1024)
typedef struct RingBuffer
{
AVFifoBuffer* fifo;
int read_back_capacity;
int read_pos;
} RingBuffer;
static int ring_init(RingBuffer* ring, unsigned int capacity, int read_back_capacity)
{
memset(ring, 0, sizeof(RingBuffer));
ring->fifo = av_fifo_alloc(capacity + read_back_capacity);
if (!ring->fifo)
return AVERROR(ENOMEM);
ring->read_back_capacity = read_back_capacity;
return 0;
}
static void ring_destroy(RingBuffer* ring)
{
av_fifo_freep(&ring->fifo);
}
static void ring_reset(RingBuffer* ring)
{
av_fifo_reset(ring->fifo);
ring->read_pos = 0;
}
static int ring_size(RingBuffer* ring)
{
return av_fifo_size(ring->fifo) - ring->read_pos;
}
static int ring_space(RingBuffer* ring)
{
return av_fifo_space(ring->fifo);
}
static int ring_generic_read(RingBuffer* ring, void* dest, int buf_size, void (*func)(void*, void*, int))
{
int ret;
av_assert2(buf_size <= ring_size(ring));
ret = av_fifo_generic_peek_at(ring->fifo, dest, ring->read_pos, buf_size, func);
ring->read_pos += buf_size;
if (ring->read_pos > ring->read_back_capacity) {
av_fifo_drain(ring->fifo, ring->read_pos - ring->read_back_capacity);
ring->read_pos = ring->read_back_capacity;
}
return ret;
}
static int ring_generic_write(RingBuffer* ring, void* src, int size, int (*func)(void*, void*, int))
{
av_assert2(size <= ring_space(ring));
return av_fifo_generic_write(ring->fifo, src, size, func);
}
static int ring_size_of_read_back(RingBuffer* ring)
{
return ring->read_pos;
}
static int ring_drain(RingBuffer* ring, int offset)
{
av_assert2(offset >= -ring_size_of_read_back(ring));
av_assert2(offset <= -ring_size(ring));
ring->read_pos += offset;
return 0;
}
3.协议定义及参数解析
这一步主要是定义QUIC串流协议的私有数据结构,及需要传递的参数信息,这些参数通过ffmpeg统一的参数解析框架进行解析,我们只需要定义好参数名、参数类型、默认值等。
typedef struct QuicContext {
const AVClass *class;
int quic_establish_timeout;
int seekable;
int block_size; //-1:Default 1MB block size, 0:Not request block, notice MUST BE 0 when play a stream, >0:Valid block size.
int block_consume; //-1:Default 50% percent, >=0:Valid percent.
char* service_port;
QuicSyncContext * sync;
} QuicContext;
#define OFFSET(x) offsetof(QuicContext, x)
#define D AV_OPT_FLAG_DECODING_PARAM
#define E AV_OPT_FLAG_ENCODING_PARAM
static const AVOption options[] = {
{ "quic_establish_timeout", "Quic session establish timeout in ms.", OFFSET(quic_establish_timeout),AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags = D|E },
{ "seekable", "Control seekability of connection.", OFFSET(seekable), AV_OPT_TYPE_BOOL, { .i64 = 0 }, -1, 1, .flags = D|E },
{ "block_size", "Request file in blocks with the size.", OFFSET(block_size), AV_OPT_TYPE_INT, { .i64 = 524288 }, -1, INT_MAX, .flags = D|E },
{ "block_consume", "Block consume percent threshold.", OFFSET(block_consume), AV_OPT_TYPE_INT, { .i64 = 50 }, 0, 100, .flags = D|E },
{ "service_port", "set quic service port", OFFSET(service_port), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, .flags = D|E },
{ NULL }
};
#define QUIC_CLASS(flavor) \
static const AVClass flavor ## _context_class = { \
.class_name = # flavor, \
.item_name = av_default_item_name, \
.option = options, \
.version = LIBAVUTIL_VERSION_INT, \
}
QUIC_CLASS(quic);
QUIC_CLASS(quics);
4.数据同步与线程安全
由于是多线程操作,必须考虑数据操作过程中的线程安全问题,这里我们定义一个名为QuicSyncContext的结构体来作为线程同步对象,具体定义见下述代码:
enum {
QP_STATE_INIT = 0,
QP_STATE_RECV_HDR = 1,
QP_STATE_RECV_BODY = 2,
QP_STATE_CLOSED = 3,
};
typedef struct QuicSyncContext {
quicpro_request_t * req;
int inner_io_error;
int io_error;
int io_eof_reached;
int qp_status;
int64_t logical_pos;
RingBuffer ring;
pthread_cond_t cond_wakeup_main;
pthread_cond_t cond_wakeup_close;
pthread_mutex_t mutex;
int abort_request;
URLContext * url_ctx;
_Atomic int ref_count;
} QuicSyncContext;
QuicPro对外提供的建连、接收数据、发送数据、关闭请求等接口都是异步的,作为跟连接相关的上下文对象,QuicSyncContext需要一个引用计数器ref_count来确保正确释放,释放函数如下:
static int quic_sync_destroy(QuicSyncContext** ppc)
{
QuicSyncContext* c;
if (!ppc || !*ppc)
{
return -1;
}
c = *ppc;
if (c && atomic_fetch_add(&c->ref_count, -1) == 1)
{
pthread_cond_destroy(&c->cond_wakeup_close);
pthread_cond_destroy(&c->cond_wakeup_main);
pthread_mutex_destroy(&c->mutex);
ring_destroy(&c->ring);
free(c);
}
*ppc = NULL;
return 0;
}
5.使用QuicPro获取网络串流
使用QuicPro接口前需要按照接口要求实现相应的回调函数:
/**
* 发送数据回调函数
* @method qp_feed_callback
* @param buf 要发送的数据缓冲区容器
* @param size 要发送的数据缓冲区大小
* @param ctx 用户指针
*/
static int qp_feed_callback(unsigned char* buf, int size, void* ctx)
{
URLContext* h = ctx;
QuicContext* s = h->priv_data;
return 0;
}
/**
* 接收HTTP3响应头回调函数
* @method qp_response_header_callback
* @param resp 请求返回的HTTP3响应头对象
* @param ctx 用户指针
*/
static int qp_response_header_callback(quicpro_response_t* resp, void* ctx)
{
QuicSyncContext* c = ctx;
if (resp)
{
pthread_mutex_lock(&c->mutex);
c->qp_status = QP_STATE_RECV_HDR;
pthread_mutex_unlock(&c->mutex);
}
return 0;
}
/**
* 接收HTTP3响应包体数据回调函数
* @method qp_response_body_callback
* @param src 接收到的响应包体数据
* @param size 接收到的响应包体数据大小
* @param ctx 用户指针
*/
static void qp_response_body_callback(unsigned char *src, size_t size, void *ctx)
{
QuicSyncContext* c = ctx;
RingBuffer* ring = &c->ring;
int ret = 0;
int fifo_space;
pthread_mutex_lock(&c->mutex);
if (c->qp_status < QP_STATE_RECV_BODY)
{
c->qp_status = QP_STATE_RECV_BODY;
}
if (c->io_eof_reached) {
pthread_cond_signal(&c->cond_wakeup_main);
pthread_mutex_unlock(&c->mutex);
goto quit;
}
fifo_space = ring_space(ring);
while (fifo_space < size)
{
av_fifo_grow(ring->fifo, size);
fifo_space = ring_space(ring);
}
ret = ring_generic_write(ring, src, size, NULL);
if (ret <= 0) {
c->io_eof_reached = 1;
if (c->inner_io_error < 0)
c->io_error = c->inner_io_error;
}
pthread_cond_signal(&c->cond_wakeup_main);
pthread_mutex_unlock(&c->mutex);
quit:
quicpro_free_buffer_fn(src);
}
/**
* 请求结束回调函数
* @method qp_response_finish_callback
* @param ctx 用户指针
* @param status 状态码
*/
static int qp_response_finish_callback(void* ctx, QPRESULT status)
{
QuicSyncContext* c = ctx;
return 0;
}
/**
* 连接关闭回调函数
* @method qp_close_callback
* @param result 状态码
* @param ctx 用户指针
*/
static int qp_close_callback(int result, void* ctx)
{
QuicSyncContext* c = ctx;
pthread_mutex_lock(&c->mutex);
if (c->abort_request == 1)
{
av_log(NULL, AV_LOG_INFO, "qp_close_callback caused abord "
"request from ffmpeg.");
pthread_cond_signal(&c->cond_wakeup_close);
c->io_error = AVERROR_EOF;
}
else
{
av_log(NULL, AV_LOG_INFO, "qp_close_callback caused abord "
"request from quicpro connection closed.");
c->abort_request = 1;
switch (c->qp_status)
{
case QP_STATE_INIT:
c->io_error = AVERROR(ENETUNREACH);
break;
case QP_STATE_RECV_HDR:
c->io_error = AVERROR_HTTP_NOT_FOUND;
break;
case QP_STATE_RECV_BODY:
c->io_error = AVERROR(EIO);
break;
}
}
c->io_eof_reached = 1;
pthread_cond_signal(&c->cond_wakeup_main);
pthread_mutex_unlock(&c->mutex);
quic_sync_destroy(&c);
return 0;
}
6.实现QUIC串流协议插件
参照ffmpeg串流协议结构定义,定义并实现自定义的协议接口及协议对象,数据读取流程如下图所示:
图5.FFplay数据读取线程同步读取流程
/**
* 打开串流协议
* @method quic_open
* @param h 上下文对象
* @param uri 协议资源地址
* @param flags 控制选项
*/
static int quic_open(URLContext *h, const char *uri, int flags) {
QuicContext *qc = h->priv_data;
int ret = 0;
QPRESULT rv = QP_R_SUCCESS;
char *url = NULL;
int url_len = 0;
const char *http_scheme = "http://";
const char *https_scheme = "https://";
const char *scheme = NULL;
const char *deli = "://";
char *p = NULL;
char hostname[1024] = {0};
char protocol[1024] = {0};
char path[1024] = {0};
char ip[16] = {0};
int port = 0;
char portstr[16] = {0};
struct addrinfo hints = {0};
struct addrinfo *ai = NULL;
struct sockaddr_in *addr = NULL;
QuicSyncContext* c = NULL;
do {
if (uri == NULL || qc->service_port == NULL) {
ret = AVERROR_UNKNOWN;
goto fifo_fail;
}
ret = get_quicpro_functions();
if (ret < 0) {
return AVERROR(ENFILE);
}
av_url_split(protocol, sizeof(protocol), NULL, 0, hostname,
sizeof(hostname), &port, path, sizeof(path), uri);
if (strncmp(protocol, "quic", strlen(protocol)) == 0) {
scheme = http_scheme;
} else if (strncmp(protocol, "quics", strlen(protocol)) == 0) {
scheme = https_scheme;
} else {
ret = AVERROR_PROTOCOL_NOT_FOUND;
goto fifo_fail;
}
if (port <= 0) {
port = 443;
}
// current only support stream data
qc->seekable = 0;
if (qc->seekable == 1) {
h->is_streamed = 0;
} else {
h->is_streamed = 1;
}
//Replace protocol with certain scheme.
p = strstr(uri, deli);
p += strlen(deli);
url_len = strlen(scheme) + strlen(p) + 1;
url = (char*)malloc(url_len);
if (!url) {
av_log(h, AV_LOG_ERROR, "Failed to malloc url buffer.\n");
ret = AVERROR_INVALIDDATA;
goto return_error;
}
memset(url, 0, url_len);
strcpy(url, scheme);
strcat(url, p);
av_log(
h, AV_LOG_INFO,
"quic_open %s, block_size:%d, block_consume:%d, "
"quic_establish_timeout:%d, service_port:%s.\n",
url,
qc->block_size,
qc->block_consume,
qc->quic_establish_timeout,
qc->service_port);
c = calloc(1, sizeof(QuicSyncContext));
if (!c) {
av_log(h, AV_LOG_ERROR, "Failed to malloc QuicSyncContext instance.\n");
ret = AVERROR_INVALIDDATA;
goto malloc_sync_failed;
}
qc->sync = c;
c->ref_count = 2;
c->qp_status = QP_STATE_INIT;
ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY);
if (ret < 0)
goto fifo_fail;
ret = pthread_mutex_init(&c->mutex, NULL);
if (ret != 0) {
av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret));
goto mutex_fail;
}
ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
if (ret != 0) {
av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
goto cond_wakeup_main_fail;
}
ret = pthread_cond_init(&c->cond_wakeup_close, NULL);
if (ret != 0) {
av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
goto cond_wakeup_close_fail;
}
rv = quicpro_initialize_fn(QP_LOG_INFO, QPVER_I001);
if (rv != QP_R_SUCCESS)
{
ret = AVERROR_UNKNOWN;
goto quic_initialize_fail;
}
quicpro_set_log_callback_fn(quic_log_callback, NULL);
c->req = quicpro_create_request_fn(qp_feed_callback, qp_response_header_callback,
qp_response_body_callback, qp_response_finish_callback, qp_close_callback, c);
if (!c->req)
{
ret = AVERROR_UNKNOWN;
goto quic_initialize_fail;
}
quicpro_request_set_method_fn(c->req, "GET");
quicpro_request_set_scheme_fn(c->req, "http");
if (strlen(hostname) > 0)
{
quicpro_request_set_hostname_fn(c->req, hostname);
}
if (qc->service_port)
{
quicpro_request_set_service_port_fn(c->req, qc->service_port);
}
if (strlen(path) > 0)
{
quicpro_request_set_path_fn(c->req, path);
}
if (qc->quic_establish_timeout > 0)
{
quicpro_request_set_connect_timeout_fn(c->req, qc->quic_establish_timeout);
}
rv = quicpro_request_send_fn(c->req);
if (rv != QP_R_SUCCESS)
{
ret = AVERROR_UNKNOWN;
goto quic_send_fail;
}
} while (0);
if (url != NULL) {
free(url);
}
av_log(h, AV_LOG_INFO, "quic_open successfully.");
return 0;
quic_send_fail:
quicpro_request_close_fn(&c->req);
quic_initialize_fail:
pthread_cond_destroy(&c->cond_wakeup_close);
cond_wakeup_close_fail:
pthread_cond_destroy(&c->cond_wakeup_main);
cond_wakeup_main_fail:
pthread_mutex_destroy(&c->mutex);
mutex_fail:
ring_destroy(&c->ring);
fifo_fail:
free(c);
qc->sync = NULL;
malloc_sync_failed:
free(url);
return_error:
av_log(h, AV_LOG_ERROR, "quic_open failed(ret=%d).", ret);
return ret;
}
/**
* 从FIFO队列读取数据内部函数
* @method quic_read_internal
* @param h 上下文对象
* @param dest 接收读取的数据的缓冲区容器
* @param size 接收读取的数据的缓冲区容器大小
* @param read_complete 是否完成读取
* @param func 读取函数回调(可选)
*/
static int quic_read_internal(URLContext* h, void* dest, int size, int read_complete,
void (*func)(void*, void*, int))
{
QuicContext* qc = h->priv_data;
QuicSyncContext* c = qc->sync;
RingBuffer* ring = &c->ring;
int to_read = size;
int ret = 0;
int timedOut = 1;
if (!c) {
return 0;
}
pthread_mutex_lock(&c->mutex);
while (to_read > 0) {
int fifo_size, to_copy;
if (c->abort_request) {
ret = c->io_error;
av_log(h, AV_LOG_INFO, "quic_read_internal abort request(ret=%d).", ret);
break;
}
fifo_size = ring_size(ring);
to_copy = FFMIN(to_read, fifo_size);
if (to_copy > 0) {
ring_generic_read(ring, dest, to_copy, func);
if (!func)
dest = (uint8_t*)dest + to_copy;
c->logical_pos += to_copy;
to_read -= to_copy;
ret = size - to_read;
if (to_read <= 0 || !read_complete)
break;
}
else if (c->io_eof_reached) {
if (ret <= 0) {
if (c->io_error)
ret = c->io_error;
else
ret = AVERROR_EOF;
}
av_log(h, AV_LOG_INFO, "quic_read_internal reach eof(ret=%d).", ret);
break;
}
do {
if (ff_check_interrupt(&h->interrupt_callback))
{
ret = 0;
goto unlock_and_return;
}
else if (c->abort_request)
{
ret = c->io_error;
goto unlock_and_return;
}
timedOut = timedWait(&c->cond_wakeup_main, &c->mutex, 10);
} while (timedOut);
}
unlock_and_return:
pthread_mutex_unlock(&c->mutex);
return ret;
}
/**
* 从串流协议读取数据回调函数
* @method quic_read
* @param h 上下文对象
* @param buf 接收读取的数据的缓冲区容器
* @param size 接收读取的数据的缓冲区容器大小
*/
static int quic_read(URLContext* h, unsigned char* buf, int size)
{
return quic_read_internal(h, buf, size, 0, NULL);
}
/**
* 向串流协议写数据回调函数
* @method quic_write
* @param h 上下文对象
* @param buf 接收读取的数据的缓冲区容器
* @param size 接收读取的数据的缓冲区容器大小
*/
static int quic_write(URLContext *h, const uint8_t *buf, int size) {
QuicContext *s = h->priv_data;
return 0;
}
/**
* 串流协议seek回调函数
* @method quic_seek
* @param h 上下文对象
* @param off 便宜量
* @param whence 控制参数
*/
static int64_t quic_seek(URLContext *h, int64_t off, int whence) {
QuicContext *s = NULL;
int64_t ret = -1;
if (h->is_streamed) {
if (whence == AVSEEK_SIZE) {
return UINT64_MAX;
} else {
return AVERROR(ENOSYS);
}
}
s = h->priv_data;
if (ret < 0) {
ret = AVERROR(ENOSYS);
}
return ret;
}
/**
* 关闭串流协议回调函数
* @method quic_close
* @param h 上下文对象
*/
static int quic_close(URLContext* h)
{
QuicContext* qc = h->priv_data;
QuicSyncContext* c = qc->sync;
int ret;
pthread_mutex_lock(&c->mutex);
av_log(h, AV_LOG_INFO, "closing quic handle %p.", c->req);
ret = quicpro_request_close_fn(&c->req);
av_log(h, AV_LOG_INFO, "quicpro_request_close return %d.", ret);
if (!c->abort_request)
{
av_log(h, AV_LOG_INFO, "quic_close abort request from ffmpeg.");
c->abort_request = 1;
}
pthread_mutex_unlock(&c->mutex);
quic_sync_destroy(&qc->sync);
av_log(h, AV_LOG_INFO, "quic_close finished.");
return 0;
}
/**
* 获取串流协议句柄回调函数
* @method quic_get_file_handle
* @param h 上下文对象
*/
static int quic_get_file_handle(URLContext *h) {
QuicContext *s = h->priv_data;
return -1;
}
/**
* quic scheme对象定义
*/
const URLProtocol ff_quic_protocol = {
.name = "quic",
.url_open = quic_open,
.url_read = quic_read,
.url_write = quic_write,
.url_seek = quic_seek,
.url_close = quic_close,
.url_get_file_handle = quic_get_file_handle,
.priv_data_size = sizeof(QuicContext),
.priv_data_class = &quic_context_class,
.flags = URL_PROTOCOL_FLAG_NETWORK,
.default_whitelist = "quic,quics"
};
/**
* quics scheme对象定义
*/
const URLProtocol ff_quics_protocol = {
.name = "quics",
.url_open = quic_open,
.url_read = quic_read,
.url_write = quic_write,
.url_seek = quic_seek,
.url_close = quic_close,
.url_get_file_handle = quic_get_file_handle,
.priv_data_size = sizeof(QuicContext),
.priv_data_class = &quics_context_class,
.flags = URL_PROTOCOL_FLAG_NETWORK,
.default_whitelist = "quic,quics"
};
QuicPro通过HTTP3协议传输过来的数据流是京东直播数据流,数据流格式是FLV串流,FFmpeg 4.0及以上版本都支持FLV串流的解析及播放。集成上述QUIC协议后,使用方法如下:
./ffplay.exe quic://jdpull.jd.com/live/10332524_fhd.flv -service_port "quicpro.jd.com:20000"
service_port 参数填写QUIC边缘服务器服务地址和端口。
四.结语
本文介绍了将QuicPro协议栈实现集成进ffplay及ijkplayer这类基于FFmpeg框架的播放器的代码及流程,展示了FFmpeg平台的强大与可扩展性。直播数据因数据量大、占用带宽高、传输距离长而符合典型的Long-Fat(长肥)网络定义。在应用QUIC技术之前,京东的直播产品都是使用基于TCP的协议(RTMP/HTTP-FLV/HLS等)进行直播流数据传输,TCP因其保守的拥堵控制策略,在应对Long-Fat网络应用场景时差强人意,QUIC技术的出现,为优化直播传输质量提供了新的选择。