开发者社区 > 博文 > FFplay集成QuicPro支持QUIC直播详解
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

FFplay集成QuicPro支持QUIC直播详解

  • an****
  • 2022-09-08
  • IP归属:北京
  • 36560浏览

    一.前言

    FFplay是著名的音视频开源项目FFmpeg自带的支持多协议播放的命令行播放器,我们熟悉的手机端播放器内核ijkplayer也是基于ffplay开发。QUIC最早是由google推出的基于UDP的可靠传输协议,目前已经被IETF接纳,并基于其上推出了正式标准RFC9000。QUIC是HTTP3协议的底层传输协议。QuicPro是京东自研的QUIC实现,遵循RFC9000标准,目前已经集成到京东APP、京喜APP等产品线,并稳定运行。

    QUIC协议基于带宽瓶颈预测及环路延时预测的传输控制算法,相比TCP的基于网络拥塞的控制算法,在抗网络抖动、提高网络传输速率方面有显著优势。根据QuicPro在线上Android应用内的直播场景中的使用情况统计数据显示,直播卡顿率较TCP由1.43%降为1.14%,降幅为20%,首开时间较TCP的949毫秒降为659毫秒,降幅为30.5%,如下图所示:

    图1.TCP与QUIC分别在直播卡顿率及首开时间上的对比

    本文将介绍如何将QuicPro协议栈实现集成进ffplay及ijkplayer这类基于FFmpeg框架的播放器。

    二.FFplay架构及工作流程

    FFplay是一个非常强大而又精简的播放器,除了支持多种文件格式、网络协议及输入源外,没有多余的界面及功能。ffplay内部架构及功能模块涉及数据流读取、解封装、视频解码、音频解码、视频渲染、音频播放、音视频同步等,ffplay使用SDL进行音频播放及视频图像渲染。下图是ffplay内部线程设置及工作流程图:

    图2.FFplay内部线程及工作流程

    如上图所示,ffplay内部共创建5个线程,具体功能为:

    1. 主线程:视频渲染播放/用户事件及消息处理,以及音视频同步逻辑;
    2. 解复用线程(读取线程):从本地文件、网络连接及输入设备中读取数据流,并调用数据流解析器对数据格式进行判断并解析,之后将数据包推入相应的队列;
    3. 视频解码线程:创建视频解码器,从video packets队列缓冲区读取视频数据包,解码后将视频帧推入 video frame队列,供渲染线程使用;
    4. 音频解码线程:创建音频解码器,从audio packets队列缓冲区读取音频数据包,解码后将音频帧推入audio frame队列,供音频播放线程使用;
    5. 音频播放线程:调用音频设备驱动接口,创建音频播放器实例,并将音频数据从audio frame中取出播放;

    由于ffplay当前还不支持QUIC传输协议,我们需要扩展ffplay以自定义网络协议的方式支持QUIC传输协议。FFplay通过FFmpeg内部精心设计的插件架构来支持多种输入协议及格式。输入的数据流一般分为串流和结构数据,文件流、网络数据流即属于串流,这类流是遵循一定格式规范的经过序列化处理的二进制数据流。每一种串流都需要自己独一无二的scheme来标识自己,并遵循ffmpeg协议结构来实现相应的open/read/write/seek/close回调函数,解复用线程通过调用avformat_open_input识别串流协议格式,并调用相应的串流数据操作接口。下图介绍了串流协议的注册与使用流程:

    图3.串流数据输入协议的注册与使用流程


    三.FFplay集成QuicPro详解

    1.QuicPro动态库载入

    为方便业务方集成,减少编译依赖,QuicPro协议插件设计成可动态加载的方式,下述代码支持windows/Android/iOS等平台动态加载QuicPro相应的动态库。

    #include "avformat.h"
    #include "network.h"
    #include "url.h"
    #include "libavutil/opt.h"
    #include "libavutil/avassert.h"
    #include "libavutil/thread.h"
    #include "libavutil/fifo.h"
    #include "libavutil/log.h"
    #include <stdatomic.h>
    
    
    // A set of macros to use for platform detection.
    #if defined(__ANDROID_API__)
    #define OS_ANDROID 1
    #elif defined(__APPLE__)
    // only include TargetConditions after testing ANDROID as some android builds
    // on mac don't have this header available and it's not needed unless the target
    // is really mac/ios.
    #include <TargetConditionals.h>
    #define OS_MACOSX 1
    #if defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE
    #define OS_IOS 1
    #endif  // defined(TARGET_OS_IPHONE) && TARGET_OS_IPHONE
    #elif defined(__linux__)
    #define OS_LINUX 1
    // include a system header to pull in features.h for glibc/uclibc macros.
    #include <unistd.h>
    #if defined(__GLIBC__) && !defined(__UCLIBC__)
    // we really are using glibc, not uClibc pretending to be glibc
    #define LIBC_GLIBC 1
    #endif
    #elif defined(_WIN32)
    #define OS_WIN 1
    #define TOOLKIT_VIEWS 1
    #elif defined(__FreeBSD__)
    #define OS_FREEBSD 1
    #elif defined(__NetBSD__)
    #define OS_NETBSD 1
    #elif defined(__OpenBSD__)
    #define OS_OPENBSD 1
    #elif defined(__sun)
    #define OS_SOLARIS 1
    #elif defined(__QNXNTO__)
    #define OS_QNX 1
    #else
    #error Please add support for your platform in build/build_config.h
    #endif
    
    #undef MIN
    #define MIN(l,o) ((l) < (o) ? (l) : (o))
    #undef MAX
    #define MAX(h,i) ((h) > (i) ? (h) : (i))
    
    
    #ifdef _MSC_VER
    #pragma warning(disable:4828)
    #define _Atomic
    #endif // _MSC_VER
    #include "quicpro.h"
    
    typedef
    QPRESULT 
    (*QUICPRO_INITIALIZE_FN)(int log_level, int quic_version);
    typedef
    QPRESULT
    (*QUICPRO_SET_LOG_CALLBACK_FN)(quicpro_log_callback callback, void* ctx);
    typedef
    quicpro_request_t*
    (*QUICPRO_CREATE_REQUEST_FN)(
        quicpro_send_callback send_cb,
        quicpro_response_header_callback resp_header_cb,
        quicpro_response_body_callback resp_body_cb,
        quicpro_response_finish_callback resp_finish_cb,
        quicpro_close_callback close_cb,
        void* ctx);
    typedef
    QPRESULT
    (*QUICPRO_REQUEST_SET_METHOD_FN)(quicpro_request_t* req, const char* method);
    typedef
    QPRESULT
    (*QUICPRO_REQUEST_SET_SCHEME_FN)(quicpro_request_t* req, const char* scheme);
    typedef
    QPRESULT
    (*QUICPRO_REQUEST_SET_HOSTNAME_FN)(quicpro_request_t* req, const char* hostname);
    typedef
    QPRESULT
    (*QUICPRO_REQUEST_SET_SERVICE_PORT_FN)(quicpro_request_t* req, const char* svc_port);
    typedef
    QPRESULT
    (*QUICPRO_REQUEST_SET_PATH_FN)(quicpro_request_t* req, const char* path);
    typedef
    QPRESULT
    (*QUICPRO_REQUEST_SET_CONNECT_TIMEOUT_FN)(quicpro_request_t* req, int milliseconds);
    typedef
    QPRESULT
    (*QUICPRO_REQUEST_SEND_FN)(quicpro_request_t* request);
    typedef
    QPRESULT
    (*QUICPRO_REQUEST_CLOSE_FN)(quicpro_request_t** request);
    typedef
    QPRESULT
    (*QUICPRO_FREE_BUFFER_FN)(void* buffer);
    
    #ifdef OS_ANDROID
    int get_so_full_path(
        const char* self_name,
        const char* dest_name,
        char* out_buffer,
        int out_buff_len) {
        int ret = 0;
        const size_t SELF_NAME_LEN = strlen(self_name);
        char linebuf[512];
        FILE* fmap = fopen("/proc/self/maps", "r");
        if (!fmap) {
            ret = -1;
            goto return_result;
        }
        memset(out_buffer, 0, out_buff_len);
        while (fgets(linebuf, sizeof(linebuf), fmap)) {
            uintptr_t begin, end;
            char perm[10], offset[20], dev[10], inode[20], path_mem[256], * path;
            int nr = sscanf(linebuf, "%zx-%zx %s %s %s %s %s", &begin, &end, perm,
                offset, dev, inode, path_mem);
            if (nr == 6) {
                path = NULL;
            }
            else {
                if (nr != 7) {
                    ret = -2;
                    goto close_file;
                }
                path = path_mem;
            }
            if (path) {
                size_t len = strlen(path);
                char * last_dir_end = path + len - SELF_NAME_LEN;
                if (!strcmp(last_dir_end, self_name)) {
                    int path_len = strlen(path);
                    last_dir_end[1] = 0;
                    strncpy(out_buffer, path, MIN(path_len, out_buff_len));
                    if (path_len < out_buff_len) {
                        out_buffer[path_len] = 0;
                        strcat(out_buffer, dest_name);
                    }
                    goto close_file;
                }
            }
        }
    
    close_file:
        fclose(fmap);
    return_result:
        return ret;
    }
    #endif // OS_ANDROID
    
    /* Obviously we can't use SDL_LoadObject() to load SDL.  :)  */
    /* Also obviously, we never close the loaded library. */
    #if defined(WIN32) || defined(_WIN32) || defined(__CYGWIN__)
    #ifndef WIN32_LEAN_AND_MEAN
    #define WIN32_LEAN_AND_MEAN 1
    #endif
    #include <windows.h>
    static void* get_quicpro_func(const char* fname, const char* sym)
    {
        HANDLE lib = LoadLibraryA(fname);
        void* retval = NULL;
        if (lib) {
            retval = GetProcAddress(lib, sym);
            if (retval == NULL) {
                FreeLibrary(lib);
            }
        }
        return retval;
    }
    #define DL_NAME "quicpro.dll"
    #elif defined(OS_LINUX) || defined(OS_ANDROID) || defined(OS_MACOSX)
    #include <dlfcn.h>
    static void* get_quicpro_func(const char* fname, const char* sym)
    {
    #ifdef OS_ANDROID
        char path[512];
        void* lib = NULL;
        {
            int ret = get_so_full_path("/libijkffmpeg.so", fname, path, sizeof(path));
            if (ret >= 0)
            {
                lib = dlopen(path, RTLD_NOW | RTLD_LOCAL);
            }
        }
    #else // !OS_ANDROID
        void* lib = dlopen(fname, RTLD_NOW | RTLD_LOCAL);
    #endif // OS_ANDROID
        void* retval = NULL;
        if (lib != NULL) {
            retval = dlsym(lib, sym);
            if (retval == NULL) {
                dlclose(lib);
            }
        }
        return retval;
    }
    #if defined(OS_ANDROID) || defined(OS_LINUX)
    #define DL_NAME "libquicpro.so"
    #elif defined(OS_MACOSX)
    #define DL_NAME "libquicpro.dylib"
    #endif // OS_ANDROID || OS_LINUX
    #else
    #error Please define your platform.
    #endif
    
    static QUICPRO_INITIALIZE_FN                    quicpro_initialize_fn = NULL;
    static QUICPRO_SET_LOG_CALLBACK_FN              quicpro_set_log_callback_fn = NULL;
    static QUICPRO_CREATE_REQUEST_FN                quicpro_create_request_fn = NULL;
    static QUICPRO_REQUEST_SET_METHOD_FN            quicpro_request_set_method_fn = NULL;
    static QUICPRO_REQUEST_SET_SCHEME_FN            quicpro_request_set_scheme_fn = NULL;
    static QUICPRO_REQUEST_SET_HOSTNAME_FN          quicpro_request_set_hostname_fn = NULL;
    static QUICPRO_REQUEST_SET_SERVICE_PORT_FN      quicpro_request_set_service_port_fn = NULL;
    static QUICPRO_REQUEST_SET_PATH_FN              quicpro_request_set_path_fn = NULL;
    static QUICPRO_REQUEST_SET_CONNECT_TIMEOUT_FN   quicpro_request_set_connect_timeout_fn = NULL;
    static QUICPRO_REQUEST_SEND_FN                  quicpro_request_send_fn = NULL;
    static QUICPRO_REQUEST_CLOSE_FN                 quicpro_request_close_fn = NULL;
    static QUICPRO_FREE_BUFFER_FN                   quicpro_free_buffer_fn = NULL;
    
    // Get quicpro function pointers
    static int get_quicpro_functions()
    {
    #define LOAD_FUNC(func_name)  \
        if (func_name##_fn == NULL)  \
            func_name##_fn = get_quicpro_func(DL_NAME, #func_name); \
        if (func_name##_fn == NULL) \
            return -1;
    
        LOAD_FUNC(quicpro_initialize);
        LOAD_FUNC(quicpro_initialize);
        LOAD_FUNC(quicpro_set_log_callback);
        LOAD_FUNC(quicpro_create_request);
        LOAD_FUNC(quicpro_request_set_method);
        LOAD_FUNC(quicpro_request_set_scheme);
        LOAD_FUNC(quicpro_request_set_hostname);
        LOAD_FUNC(quicpro_request_set_service_port);
        LOAD_FUNC(quicpro_request_set_path);
        LOAD_FUNC(quicpro_request_set_connect_timeout);
        LOAD_FUNC(quicpro_request_send);
        LOAD_FUNC(quicpro_request_close);
        LOAD_FUNC(quicpro_free_buffer);
    #undef LOAD_FUNC
        return 0;
    }
    
    

    2.多线程模型及FIFO队列

    QuicPro动态库有自己独立的I/O线程,ffplay也有自己独立的串流读取线程,要实现quic串流协议,需要进行多线程操作,我们基于ffmpeg原生的AVFiFoBuffer设计了RingBuffer结构,用于QuicPro和ffplay之间的数据传输。

    图4.通过FIFO队列实现线程间数据传输

    
    #define BUFFER_CAPACITY         (4 * 1024 * 1024)
    #define READ_BACK_CAPACITY      (4 * 1024 * 1024)
    #define SHORT_SEEK_THRESHOLD    (256 * 1024)
    
    typedef struct RingBuffer
    {
    	AVFifoBuffer* fifo;
    	int           read_back_capacity;
    	int           read_pos;
    } RingBuffer;
    
    
    
    static int ring_init(RingBuffer* ring, unsigned int capacity, int read_back_capacity)
    {
    	memset(ring, 0, sizeof(RingBuffer));
    	ring->fifo = av_fifo_alloc(capacity + read_back_capacity);
    	if (!ring->fifo)
    		return AVERROR(ENOMEM);
    
    	ring->read_back_capacity = read_back_capacity;
    	return 0;
    }
    
    static void ring_destroy(RingBuffer* ring)
    {
    	av_fifo_freep(&ring->fifo);
    }
    
    static void ring_reset(RingBuffer* ring)
    {
    	av_fifo_reset(ring->fifo);
    	ring->read_pos = 0;
    }
    
    static int ring_size(RingBuffer* ring)
    {
    	return av_fifo_size(ring->fifo) - ring->read_pos;
    }
    
    static int ring_space(RingBuffer* ring)
    {
    	return av_fifo_space(ring->fifo);
    }
    
    static int ring_generic_read(RingBuffer* ring, void* dest, int buf_size, void (*func)(void*, void*, int))
    {
    	int ret;
    
    	av_assert2(buf_size <= ring_size(ring));
    	ret = av_fifo_generic_peek_at(ring->fifo, dest, ring->read_pos, buf_size, func);
    	ring->read_pos += buf_size;
    
    	if (ring->read_pos > ring->read_back_capacity) {
    		av_fifo_drain(ring->fifo, ring->read_pos - ring->read_back_capacity);
    		ring->read_pos = ring->read_back_capacity;
    	}
    
    	return ret;
    }
    
    static int ring_generic_write(RingBuffer* ring, void* src, int size, int (*func)(void*, void*, int))
    {
    	av_assert2(size <= ring_space(ring));
    	return av_fifo_generic_write(ring->fifo, src, size, func);
    }
    
    static int ring_size_of_read_back(RingBuffer* ring)
    {
    	return ring->read_pos;
    }
    
    static int ring_drain(RingBuffer* ring, int offset)
    {
    	av_assert2(offset >= -ring_size_of_read_back(ring));
    	av_assert2(offset <= -ring_size(ring));
    	ring->read_pos += offset;
    	return 0;
    }

    3.协议定义及参数解析

    这一步主要是定义QUIC串流协议的私有数据结构,及需要传递的参数信息,这些参数通过ffmpeg统一的参数解析框架进行解析,我们只需要定义好参数名、参数类型、默认值等。

    
    typedef struct QuicContext {
        const AVClass *class;
        int quic_establish_timeout;
        int seekable;
        int block_size;         //-1:Default 1MB block size, 0:Not request block, notice MUST BE 0 when play a stream, >0:Valid block size.
        int block_consume;      //-1:Default 50% percent, >=0:Valid percent.
        char* service_port;
    
        QuicSyncContext * sync;
    } QuicContext;
    
    
    
    #define OFFSET(x) offsetof(QuicContext, x)
    #define D AV_OPT_FLAG_DECODING_PARAM
    #define E AV_OPT_FLAG_ENCODING_PARAM
    
    static const AVOption options[] = {
        { "quic_establish_timeout", "Quic session establish timeout in ms.",  OFFSET(quic_establish_timeout),AV_OPT_TYPE_INT,   { .i64 = -1 },      -1,     INT_MAX,  .flags = D|E },
        { "seekable",               "Control seekability of connection.",     OFFSET(seekable),              AV_OPT_TYPE_BOOL,  { .i64 = 0 },       -1,     1,        .flags = D|E },
        { "block_size",             "Request file in blocks with the size.",  OFFSET(block_size),            AV_OPT_TYPE_INT,   { .i64 = 524288 },  -1,     INT_MAX,  .flags = D|E },
        { "block_consume",          "Block consume percent threshold.",       OFFSET(block_consume),         AV_OPT_TYPE_INT,   { .i64 = 50 },      0,      100,      .flags = D|E },
        { "service_port",           "set quic service port",                  OFFSET(service_port),          AV_OPT_TYPE_STRING, {.str = NULL },    0,      0,        .flags = D|E },
        { NULL }
    };
    
    #define QUIC_CLASS(flavor)                          \
    static const AVClass flavor ## _context_class = {   \
        .class_name = # flavor,                         \
        .item_name  = av_default_item_name,             \
        .option     = options,                          \
        .version    = LIBAVUTIL_VERSION_INT,            \
    }
    
    QUIC_CLASS(quic);
    QUIC_CLASS(quics);

    4.数据同步与线程安全

    由于是多线程操作,必须考虑数据操作过程中的线程安全问题,这里我们定义一个名为QuicSyncContext的结构体来作为线程同步对象,具体定义见下述代码:

    
    enum {
        QP_STATE_INIT       = 0,
        QP_STATE_RECV_HDR   = 1,
        QP_STATE_RECV_BODY  = 2,
        QP_STATE_CLOSED     = 3,
    };
    
    typedef struct QuicSyncContext {
        quicpro_request_t * req;
    	int					inner_io_error;
    	int					io_error;
    	int					io_eof_reached;
        int                 qp_status;
    
    	int64_t				logical_pos;
    	RingBuffer          ring;
    
    	pthread_cond_t		cond_wakeup_main;
    	pthread_cond_t		cond_wakeup_close;
    	pthread_mutex_t		mutex;
    	int                 abort_request;
        URLContext      *   url_ctx;
        _Atomic int         ref_count;
    } QuicSyncContext;

    QuicPro对外提供的建连、接收数据、发送数据、关闭请求等接口都是异步的,作为跟连接相关的上下文对象,QuicSyncContext需要一个引用计数器ref_count来确保正确释放,释放函数如下:

    static int quic_sync_destroy(QuicSyncContext** ppc)
    {
        QuicSyncContext* c;
    
        if (!ppc || !*ppc)
        {
            return -1;
        }
        c = *ppc;
        if (c && atomic_fetch_add(&c->ref_count, -1) == 1)
        {
            pthread_cond_destroy(&c->cond_wakeup_close);
            pthread_cond_destroy(&c->cond_wakeup_main);
            pthread_mutex_destroy(&c->mutex);
            ring_destroy(&c->ring);
            free(c);
        }
        *ppc = NULL;
        return 0;
    }

    5.使用QuicPro获取网络串流

    使用QuicPro接口前需要按照接口要求实现相应的回调函数:

    /**
     * 发送数据回调函数
     * @method qp_feed_callback
     * @param buf 要发送的数据缓冲区容器
     * @param size 要发送的数据缓冲区大小
     * @param ctx 用户指针
     */
    static int qp_feed_callback(unsigned char* buf, int size, void* ctx)
    {
    	URLContext* h = ctx;
    	QuicContext* s = h->priv_data;
    
    	return 0;
    }
    
    /**
     * 接收HTTP3响应头回调函数
     * @method qp_response_header_callback
     * @param resp 请求返回的HTTP3响应头对象
     * @param ctx 用户指针
     */
    static int qp_response_header_callback(quicpro_response_t* resp, void* ctx)
    {
        QuicSyncContext* c = ctx;
    
        if (resp)
        {
            pthread_mutex_lock(&c->mutex);
            c->qp_status = QP_STATE_RECV_HDR;
            pthread_mutex_unlock(&c->mutex);
        }
    
    	return 0;
    }
    
    /**
     * 接收HTTP3响应包体数据回调函数
     * @method qp_response_body_callback
     * @param src 接收到的响应包体数据
     * @param size 接收到的响应包体数据大小
     * @param ctx 用户指针
     */
    static void qp_response_body_callback(unsigned char *src, size_t size, void *ctx)
    {
    	QuicSyncContext* c = ctx;
    	RingBuffer* ring = &c->ring;
    	int           ret = 0;
    
    	int fifo_space;
    
    	pthread_mutex_lock(&c->mutex);
        if (c->qp_status < QP_STATE_RECV_BODY)
        {
            c->qp_status = QP_STATE_RECV_BODY;
        }
    	if (c->io_eof_reached) {
    		pthread_cond_signal(&c->cond_wakeup_main);
    		pthread_mutex_unlock(&c->mutex);
    		goto quit;
    	}
    
    	fifo_space = ring_space(ring);
        while (fifo_space < size)
        {
            av_fifo_grow(ring->fifo, size);
            fifo_space = ring_space(ring);
        }
    	ret = ring_generic_write(ring, src, size, NULL);
    
    	if (ret <= 0) {
    		c->io_eof_reached = 1;
    		if (c->inner_io_error < 0)
    			c->io_error = c->inner_io_error;
    	}
    
    	pthread_cond_signal(&c->cond_wakeup_main);
    	pthread_mutex_unlock(&c->mutex);
    
    quit:
        quicpro_free_buffer_fn(src);
    }
    
    /**
     * 请求结束回调函数
     * @method qp_response_finish_callback
     * @param ctx 用户指针
     * @param status 状态码
     */
    static int qp_response_finish_callback(void* ctx, QPRESULT status)
    {
        QuicSyncContext* c = ctx;
    
    	return 0;
    }
    
    /**
     * 连接关闭回调函数
     * @method qp_close_callback
     * @param result 状态码
     * @param ctx 用户指针
     */
    static int qp_close_callback(int result, void* ctx)
    {
    	QuicSyncContext* c = ctx;
    
    	pthread_mutex_lock(&c->mutex);
        if (c->abort_request == 1)
    	{
            av_log(NULL, AV_LOG_INFO, "qp_close_callback caused abord "
                "request from ffmpeg.");
    		pthread_cond_signal(&c->cond_wakeup_close);
    	    c->io_error = AVERROR_EOF;
        }
        else
        {
            av_log(NULL, AV_LOG_INFO, "qp_close_callback caused abord "
                "request from quicpro connection closed.");
    		c->abort_request = 1;
            switch (c->qp_status)
            {
            case QP_STATE_INIT:
                c->io_error = AVERROR(ENETUNREACH);
                break;
            case QP_STATE_RECV_HDR:
    	        c->io_error = AVERROR_HTTP_NOT_FOUND;
                break;
            case QP_STATE_RECV_BODY:
    	        c->io_error = AVERROR(EIO);
                break;
            }
        }
    	c->io_eof_reached = 1;
    	pthread_cond_signal(&c->cond_wakeup_main);
    	pthread_mutex_unlock(&c->mutex);
        quic_sync_destroy(&c);
    
    	return 0;
    }

    6.实现QUIC串流协议插件

    参照ffmpeg串流协议结构定义,定义并实现自定义的协议接口及协议对象,数据读取流程如下图所示:

    图5.FFplay数据读取线程同步读取流程

    /**
     * 打开串流协议
     * @method quic_open
     * @param h 上下文对象
     * @param uri 协议资源地址
     * @param flags 控制选项
     */
    static int quic_open(URLContext *h, const char *uri, int flags) {
        QuicContext *qc = h->priv_data;
        int ret = 0;
        QPRESULT rv = QP_R_SUCCESS;
        char *url = NULL;
        int url_len = 0;
        const char *http_scheme = "http://";
        const char *https_scheme = "https://";
        const char *scheme = NULL;
        const char *deli = "://";
        char *p = NULL;
        char hostname[1024] = {0};
        char protocol[1024] = {0};
        char path[1024] = {0};
        char ip[16] = {0};
        int port = 0;
        char portstr[16] = {0};
        struct addrinfo hints = {0};
        struct addrinfo *ai = NULL;
        struct sockaddr_in *addr = NULL;
        QuicSyncContext* c = NULL;
        do {
            if (uri == NULL || qc->service_port == NULL) {
                ret = AVERROR_UNKNOWN;
                goto fifo_fail;
            }
    
            ret = get_quicpro_functions();
            if (ret < 0) {
                return AVERROR(ENFILE);
            }
            
            av_url_split(protocol, sizeof(protocol), NULL, 0, hostname, 
                sizeof(hostname), &port, path, sizeof(path), uri);
            if (strncmp(protocol, "quic", strlen(protocol)) == 0) {
                scheme = http_scheme;
            } else if (strncmp(protocol, "quics", strlen(protocol)) == 0) {
                scheme = https_scheme;
            } else {
                ret = AVERROR_PROTOCOL_NOT_FOUND;
                goto fifo_fail;
            }
    
            if (port <= 0) {
               port = 443;
            }
    
            // current only support stream data
            qc->seekable = 0;
            if (qc->seekable == 1) {
                h->is_streamed = 0;
            } else {
                h->is_streamed = 1;
            }
    
            //Replace protocol with certain scheme.
            p       = strstr(uri, deli);
            p       += strlen(deli);
            url_len = strlen(scheme) + strlen(p) + 1;
            url     = (char*)malloc(url_len);
    		if (!url) {
    			av_log(h, AV_LOG_ERROR, "Failed to malloc url buffer.\n");
                ret = AVERROR_INVALIDDATA;
    			goto return_error;
    		}
            memset(url, 0, url_len);
            strcpy(url, scheme);
            strcat(url, p);
    
            av_log(
                h, AV_LOG_INFO,
                "quic_open %s, block_size:%d, block_consume:%d, "
                    "quic_establish_timeout:%d, service_port:%s.\n",
                url,
                qc->block_size,
                qc->block_consume,
                qc->quic_establish_timeout,
                qc->service_port);
    
            c = calloc(1, sizeof(QuicSyncContext));
    		if (!c) {
    			av_log(h, AV_LOG_ERROR, "Failed to malloc QuicSyncContext instance.\n");
                ret = AVERROR_INVALIDDATA;
    			goto malloc_sync_failed;
    		}
            qc->sync = c;
            c->ref_count = 2;
            c->qp_status = QP_STATE_INIT;
    
    		ret = ring_init(&c->ring, BUFFER_CAPACITY, READ_BACK_CAPACITY);
    		if (ret < 0)
    			goto fifo_fail;
    
    		ret = pthread_mutex_init(&c->mutex, NULL);
    		if (ret != 0) {
    			av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret));
    			goto mutex_fail;
    		}
    
    		ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
    		if (ret != 0) {
    			av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
    			goto cond_wakeup_main_fail;
    		}
    
    		ret = pthread_cond_init(&c->cond_wakeup_close, NULL);
    		if (ret != 0) {
    			av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
    			goto cond_wakeup_close_fail;
    		}
    
            rv = quicpro_initialize_fn(QP_LOG_INFO, QPVER_I001);
            if (rv != QP_R_SUCCESS)
            {
                ret = AVERROR_UNKNOWN;
                goto quic_initialize_fail;
            }
            quicpro_set_log_callback_fn(quic_log_callback, NULL);
            c->req = quicpro_create_request_fn(qp_feed_callback, qp_response_header_callback,
                qp_response_body_callback, qp_response_finish_callback, qp_close_callback, c);
            if (!c->req)
            {
                ret = AVERROR_UNKNOWN;
                goto quic_initialize_fail;
    		}
    		quicpro_request_set_method_fn(c->req, "GET");
    		quicpro_request_set_scheme_fn(c->req, "http");
            if (strlen(hostname) > 0)
    		{
    			quicpro_request_set_hostname_fn(c->req, hostname);
            }
            if (qc->service_port)
    		{
    			quicpro_request_set_service_port_fn(c->req, qc->service_port);
            }
            if (strlen(path) > 0)
    		{
    			quicpro_request_set_path_fn(c->req, path);
            }
            if (qc->quic_establish_timeout > 0)
            {
                quicpro_request_set_connect_timeout_fn(c->req, qc->quic_establish_timeout);
            }
    		rv = quicpro_request_send_fn(c->req);
            if (rv != QP_R_SUCCESS)
            {
                ret = AVERROR_UNKNOWN;
                goto quic_send_fail;
            }
            
        } while (0);
    
        if (url != NULL) {
            free(url);
    	}
    
        av_log(h, AV_LOG_INFO, "quic_open successfully.");
    
        return 0;
    
    quic_send_fail:
        quicpro_request_close_fn(&c->req);
    quic_initialize_fail:
    	pthread_cond_destroy(&c->cond_wakeup_close);
    cond_wakeup_close_fail:
    	pthread_cond_destroy(&c->cond_wakeup_main);
    cond_wakeup_main_fail:
    	pthread_mutex_destroy(&c->mutex);
    mutex_fail:
    	ring_destroy(&c->ring);
    fifo_fail:
        free(c);
        qc->sync = NULL;
    malloc_sync_failed:
    	free(url);
    return_error:
        av_log(h, AV_LOG_ERROR, "quic_open failed(ret=%d).", ret);
        return ret;
    }
    
    /**
     * 从FIFO队列读取数据内部函数
     * @method quic_read_internal
     * @param h 上下文对象
     * @param dest 接收读取的数据的缓冲区容器
     * @param size 接收读取的数据的缓冲区容器大小
     * @param read_complete 是否完成读取
     * @param func 读取函数回调(可选)
     */
    static int quic_read_internal(URLContext* h, void* dest, int size, int read_complete,
    	void (*func)(void*, void*, int))
    {
    	QuicContext* qc = h->priv_data;
        QuicSyncContext* c = qc->sync;
    	RingBuffer* ring = &c->ring;
    	int           to_read = size;
    	int           ret = 0;
        int           timedOut = 1;
    
        if (!c) {
            return 0;
        }
    	pthread_mutex_lock(&c->mutex);
    
    	while (to_read > 0) {
    		int fifo_size, to_copy;
    		if (c->abort_request) {
    			ret = c->io_error;
                av_log(h, AV_LOG_INFO, "quic_read_internal abort request(ret=%d).", ret);
    			break;
    		}
    		fifo_size = ring_size(ring);
    		to_copy = FFMIN(to_read, fifo_size);
    		if (to_copy > 0) {
    			ring_generic_read(ring, dest, to_copy, func);
    			if (!func)
    				dest = (uint8_t*)dest + to_copy;
    			c->logical_pos += to_copy;
    			to_read -= to_copy;
    			ret = size - to_read;
    
    			if (to_read <= 0 || !read_complete)
    				break;
    		}
    		else if (c->io_eof_reached) {
    			if (ret <= 0) {
    				if (c->io_error)
    					ret = c->io_error;
    				else
    					ret = AVERROR_EOF;
    			}
                av_log(h, AV_LOG_INFO, "quic_read_internal reach eof(ret=%d).", ret);
    			break;
    		}
            do {
                if (ff_check_interrupt(&h->interrupt_callback))
                {
                    ret = 0;
                    goto unlock_and_return;
                }
                else if (c->abort_request)
                {
                    ret = c->io_error;
                    goto unlock_and_return;
                }
                timedOut = timedWait(&c->cond_wakeup_main, &c->mutex, 10);
            } while (timedOut);
    	}
    
    unlock_and_return:
    	pthread_mutex_unlock(&c->mutex);
    
    	return ret;
    }
    
    /**
     * 从串流协议读取数据回调函数
     * @method quic_read
     * @param h 上下文对象
     * @param buf 接收读取的数据的缓冲区容器
     * @param size 接收读取的数据的缓冲区容器大小
     */
    static int quic_read(URLContext* h, unsigned char* buf, int size)
    {
    	return quic_read_internal(h, buf, size, 0, NULL);
    }
    
    /**
     * 向串流协议写数据回调函数
     * @method quic_write
     * @param h 上下文对象
     * @param buf 接收读取的数据的缓冲区容器
     * @param size 接收读取的数据的缓冲区容器大小
     */
    static int quic_write(URLContext *h, const uint8_t *buf, int size) {
        QuicContext *s = h->priv_data;
        return 0;
    }
    
    /**
     * 串流协议seek回调函数
     * @method quic_seek
     * @param h 上下文对象
     * @param off 便宜量
     * @param whence 控制参数
     */
    static int64_t quic_seek(URLContext *h, int64_t off, int whence) {
        QuicContext *s = NULL;
        int64_t ret = -1;
    
        if (h->is_streamed) {
            if (whence == AVSEEK_SIZE) {
                return UINT64_MAX;
            } else {
                return AVERROR(ENOSYS);
            }
        }
    
        s = h->priv_data;
        if (ret < 0) {
            ret = AVERROR(ENOSYS);
        }
        return ret;
    }
    
    /**
     * 关闭串流协议回调函数
     * @method quic_close
     * @param h 上下文对象
     */
    static int quic_close(URLContext* h)
    {
    	QuicContext* qc = h->priv_data;
        QuicSyncContext* c = qc->sync;
    	int      ret;
    
    	pthread_mutex_lock(&c->mutex);
    	av_log(h, AV_LOG_INFO, "closing quic handle %p.", c->req);
    	ret = quicpro_request_close_fn(&c->req);
    	av_log(h, AV_LOG_INFO, "quicpro_request_close return %d.", ret);
        if (!c->abort_request)
    	{
            av_log(h, AV_LOG_INFO, "quic_close abort request from ffmpeg.");
    		c->abort_request = 1;
        }
    	pthread_mutex_unlock(&c->mutex);
    
        quic_sync_destroy(&qc->sync);
    	av_log(h, AV_LOG_INFO, "quic_close finished.");
    
    	return 0;
    }
    
    /**
     * 获取串流协议句柄回调函数
     * @method quic_get_file_handle
     * @param h 上下文对象
     */
    static int quic_get_file_handle(URLContext *h) {
        QuicContext *s = h->priv_data;
        return -1;
    }
    
    /**
     * quic scheme对象定义
     */
    const URLProtocol ff_quic_protocol = {
        .name                = "quic",
        .url_open            = quic_open,
        .url_read            = quic_read,
        .url_write           = quic_write,
        .url_seek            = quic_seek,
        .url_close           = quic_close,
        .url_get_file_handle = quic_get_file_handle,
        .priv_data_size      = sizeof(QuicContext),
        .priv_data_class     = &quic_context_class,
        .flags               = URL_PROTOCOL_FLAG_NETWORK,
        .default_whitelist   = "quic,quics"
    };
    
    /**
     * quics scheme对象定义
     */
    const URLProtocol ff_quics_protocol = {
        .name                = "quics",
        .url_open            = quic_open,
        .url_read            = quic_read,
        .url_write           = quic_write,
        .url_seek            = quic_seek,
        .url_close           = quic_close,
        .url_get_file_handle = quic_get_file_handle,
        .priv_data_size      = sizeof(QuicContext),
        .priv_data_class     = &quics_context_class,
        .flags               = URL_PROTOCOL_FLAG_NETWORK,
        .default_whitelist   = "quic,quics"
    };

    QuicPro通过HTTP3协议传输过来的数据流是京东直播数据流,数据流格式是FLV串流,FFmpeg 4.0及以上版本都支持FLV串流的解析及播放。集成上述QUIC协议后,使用方法如下:

    ./ffplay.exe quic://jdpull.jd.com/live/10332524_fhd.flv -service_port "quicpro.jd.com:20000"

    service_port 参数填写QUIC边缘服务器服务地址和端口。

    四.结语

    本文介绍了将QuicPro协议栈实现集成进ffplay及ijkplayer这类基于FFmpeg框架的播放器的代码及流程,展示了FFmpeg平台的强大与可扩展性。直播数据因数据量大、占用带宽高、传输距离长而符合典型的Long-Fat(长肥)网络定义。在应用QUIC技术之前,京东的直播产品都是使用基于TCP的协议(RTMP/HTTP-FLV/HLS等)进行直播流数据传输,TCP因其保守的拥堵控制策略,在应对Long-Fat网络应用场景时差强人意,QUIC技术的出现,为优化直播传输质量提供了新的选择。





    文章数
    2
    阅读量
    2160