最佳实践之弹幕功能设计 | go 技术论坛-金年会app官方网
[toc]
写在前面
最近逛b站刷各种鬼畜,发现弹幕真是有意思的话题,弹幕内容各种搞怪鬼畜等,那么言归正传,弹幕在在线视频平台或者社交直播的不可或缺之物,本文将如何开发一个简单的弹幕和在线观看统计功能,如果你正在考虑在你的应用中加入弹幕功能,那么你来对地方了。
在深入实现之前,我们先来了解一下弹幕是什么。弹幕,源自日本,是指在视频、直播等画面上飘动的一些实时评论或互动信息。用户可以通过输入文字,将其发送到共享的画面上,形成一种实时互动的社交体验。
主要功能
这里主要实现接口:
- 实时获取弹幕
- 发布弹幕
数据库表设计
这里需要设计弹幕表,用来记录对应视频的弹幕信息
// barrage 弹幕表结构
type barrage struct {
    id          int    //主键
    content     string //弹幕内容
    currenttime int    //弹幕在视频中的展示时间,秒
    addtime     int64  //添加时间
    userid      int    //添加用户
    status      int    //弹幕状态
    episodesid  int    //归属具体视频
    videoid     int    //归属影视作品
}相信下图很好理解:
实时获取弹幕
下面来实现弹幕的核心功能,实习获取弹幕信息,为了系统性能和方便统计在线观看,这里使用到websocket保证实时性。
主要逻辑
1、使用ws协议,从视频播放开始每隔60s获取一次弹幕内容,当前60s播放结束后再次请求下一次60s开始的时候后期60s的弹幕内容,前端这要在60s循环,对比弹幕时间和视频播放时间对应,渲染到屏幕即可。
2、为了统计在线观看数,这里需要设计全局线程安全的map:
type videostats struct {
    views map[int]int  //id->view_count
    mu    sync.mutex   //互斥锁保证并发安全
}代码实现
获取弹幕,最终需要在数据库中查询,查询条件是:episodesid视频id,starttime-endtime时间范围,前端需要给到两个参数:
- episodesid
- currenttime
为了统计在线观看数,我们需要实现一个并发安全的map和对应操作方法:
type videostats struct {
    views map[int]int
    mu    sync.mutex
}
var videostats = newvideostats()
func newvideostats() *videostats {
    return &videostats{
        views: make(map[int]int),
    }
}
func (vs *videostats) addview(videoid int) {
    vs.mu.lock()
    defer vs.mu.unlock()
    vs.views[videoid]
}
func (vs *videostats) reduceview(videoid int) {
    vs.mu.lock()
    defer vs.mu.unlock()
    vs.views[videoid]--
}
func (vs *videostats) getviews(videoid int) int {
    vs.mu.lock()
    defer vs.mu.unlock()
    return vs.views[videoid]
}定义一些结构体:
// wsdata 数据格式
type wsdata struct {
    currenttime int //当前时间
    episodesid  int //视频id
}
//返回的数据格式
type barrage struct {
    barragemsg []models.barragedata `json:"barrage_msg"`
    useronline int                  `json:"user_online"`
}这里需要解决跨域问题:
// 设置websocket跨域问题
var (
    upgrader = websocket.upgrader{
        checkorigin: func(r *http.request) bool {
            return true
        },
    }
)接下来看controller层的核心代码:
func (b *barragecontrollers) barragews() {
    var (
        conn    *websocket.conn
        err     error
        data    []byte
        barrage barrage
    )
    //将http转为websocket
    if conn, err = upgrader.upgrade(b.ctx.responsewriter, b.ctx.request, nil); err != nil {
        conn.close()
    }
    if _, data, err = conn.readmessage(); err != nil {
        conn.close()
    }
    var wsdatainfo wsdata
    json.unmarshal(data, &wsdatainfo)
    videostats.addview(wsdatainfo.episodesid)
    resbarragemsg(conn, wsdatainfo, barrage)
    //用户退出播放视频,需要对对应视频在线数减一,并且关闭连接
    defer func(eid int) {
        videostats.reduceview(eid)
        conn.close()
    }(wsdatainfo.episodesid)
    //监听消息
    for {
        if _, data, err = conn.readmessage(); err != nil {
            conn.close()
        }
        var wsdata wsdata
        json.unmarshal(data, &wsdata)
        resbarragemsg(conn, wsdata, barrage)
    }
}
func resbarragemsg(conn *websocket.conn, wsdata wsdata, barrage barrage) {
    var err error
    //当前时间开始后的60s
    endtime := wsdata.currenttime  60
    //获取弹幕数据
    _, barrage.barragemsg, err = models.barragelist(wsdata.episodesid, wsdata.currenttime, endtime)
    //返回在线人数
    barrage.useronline = videostats.getviews(wsdata.episodesid)
    if err == nil {
        if err := conn.writejson(barrage); err != nil {
            conn.close()
        }
    }
}这里您可能有疑问,为什么在for之前要获取一次websocket的数据?
原因是:为了方便统计对应视频的在线观看人数,如果我们之间在for中做这就是,其实是很复杂的,可能会涉及到channel,并发,数据一致性等各种问题,您可以理解为为了简化流程。
model层代码:
// barragedata 弹幕返回结构
type barragedata struct {
    id          int    `json:"id"`
    content     string `json:"content"`
    currenttime int    `json:"currenttime"`
}
// barragelist 获取指定时间范围弹幕内容
func barragelist(episodesid int, starttime int, endtime int) (int64, []barragedata, error) {
    o := orm.neworm()
    var barrages []barragedata
    num, err := o.raw("select id,content,`current_time` from barrage where status=1 and episodes_id=? and `current_time`>=? and `current_time`, episodesid, starttime, endtime).queryrows(&barrages)
    return num, barrages, err
}我们在router配置接口路由:
package routers
import (
    "fyoukuapi/controllers"
    "github.com/astaxie/beego"
)
// 路由配置
func init() {
    //弹幕功能
    beego.router("/barrage/ws", &controllers.barragecontrollers{}, "get:barragews")
}
完整代码
controller层:
package controllers
import (
    "encoding/json"
    "fyoukuapi/models"
    "github.com/astaxie/beego"
    "github.com/gorilla/websocket"
    "net/http"
    "sync"
)
type barragecontrollers struct {
    beego.controller
}
type videostats struct {
    views map[int]int
    mu    sync.mutex
}
var videostats = newvideostats()
func newvideostats() *videostats {
    return &videostats{
        views: make(map[int]int),
    }
}
func (vs *videostats) addview(videoid int) {
    vs.mu.lock()
    defer vs.mu.unlock()
    vs.views[videoid]
}
func (vs *videostats) reduceview(videoid int) {
    vs.mu.lock()
    defer vs.mu.unlock()
    vs.views[videoid]--
}
func (vs *videostats) getviews(videoid int) int {
    vs.mu.lock()
    defer vs.mu.unlock()
    return vs.views[videoid]
}
// wsdata 数据格式
type wsdata struct {
    currenttime int //当前时间
    episodesid  int //视频id
}
type barrage struct {
    barragemsg []models.barragedata `json:"barrage_msg"`
    useronline int                  `json:"user_online"`
}
// 设置websocket跨域问题
var (
    upgrader = websocket.upgrader{
        checkorigin: func(r *http.request) bool {
            return true
        },
    }
)
// barragews 获取弹幕websocket 核心逻辑:使用ws协议,从视频播放开始获取60s的弹幕内容,60s播放结束后再次请求后60s的弹幕内容,
// 前端这要在60s循环对比弹幕时间和视频播放时间对应,渲染到屏幕即可。
func (b *barragecontrollers) barragews() {
    var (
        conn    *websocket.conn
        err     error
        data    []byte
        barrage barrage
    )
    //将http转为websocket
    if conn, err = upgrader.upgrade(b.ctx.responsewriter, b.ctx.request, nil); err != nil {
        conn.close()
    }
    if _, data, err = conn.readmessage(); err != nil {
        conn.close()
    }
    var wsdatainfo wsdata
    json.unmarshal(data, &wsdatainfo)
    videostats.addview(wsdatainfo.episodesid)
    resbarragemsg(conn, wsdatainfo, barrage)
    //用户退出视频
    defer func(eid int) {
        videostats.reduceview(eid)
        conn.close()
    }(wsdatainfo.episodesid)
    //监听消息
    for {
        if _, data, err = conn.readmessage(); err != nil {
            conn.close()
        }
        var wsdata wsdata
        json.unmarshal(data, &wsdata)
        resbarragemsg(conn, wsdata, barrage)
    }
}
func resbarragemsg(conn *websocket.conn, wsdata wsdata, barrage barrage) {
    var err error
    //当前时间开始后的60s
    endtime := wsdata.currenttime  60
    //获取弹幕数据
    _, barrage.barragemsg, err = models.barragelist(wsdata.episodesid, wsdata.currenttime, endtime)
    //返回在线人数
    barrage.useronline = videostats.getviews(wsdata.episodesid)
    if err == nil {
        if err := conn.writejson(barrage); err != nil {
            conn.close()
        }
    }
}model层:
package models
import (
    "github.com/astaxie/beego/orm"
    "time"
)
// barrage 弹幕表结构
type barrage struct {
    id          int    //主键
    content     string //弹幕内容
    currenttime int    //当前时间,秒
    addtime     int64  //添加时间
    userid      int    //添加用户
    status      int    //弹幕状态
    episodesid  int    //弹幕视频
    videoid     int    //归属视频
}
// barragedata 弹幕返回结构
type barragedata struct {
    id          int    `json:"id"`
    content     string `json:"content"`
    currenttime int    `json:"currenttime"`
}
func init() {
    orm.registermodel(new(barrage))
}
// barragelist 获取指定时间范围弹幕内容
func barragelist(episodesid int, starttime int, endtime int) (int64, []barragedata, error) {
    o := orm.neworm()
    var barrages []barragedata
    num, err := o.raw("select id,content,`current_time` from barrage where status=1 and episodes_id=? and `current_time`>=? and `current_time`, episodesid, starttime, endtime).queryrows(&barrages)
    return num, barrages, err
}测试
使用api工具测试
返回数据:
{
    "barrage_msg": [
        {
            "id": 53,
            "content": "冲冲冲!",
            "currenttime": 1
        },
        {
            "id": 54,
            "content": "斗破大陆一片天,谁见海老不递烟!",
            "currenttime": 1
        },
        {
            "id": 55,
            "content": "你们看到这里的时候我已经看完了",
            "currenttime": 1
        },
        {
            "id": 56,
            "content": "打倒唐三,胜利属于武魂殿",
            "currenttime": 8
        },
        {
            "id": 57,
            "content": "萧炎突破斗帝了哎",
            "currenttime": 10
        },
        {
            "id": 58,
            "content": "魂天帝受死吧",
            "currenttime": 10
        },
        {
            "id": 61,
            "content": "斗破大陆一片天,谁见海老不递烟!",
            "currenttime": 10
        },
        {
            "id": 62,
            "content": "冲冲冲!",
            "currenttime": 14
        },
        {
            "id": 63,
            "content": "反派死于话多!",
            "currenttime": 14
        },
        {
            "id": 64,
            "content": "话太多了!",
            "currenttime": 17
        },
        {
            "id": 52,
            "content": "发个弹幕试一试",
            "currenttime": 18
        }
    ],
    "user_online": 2
}
最终效果:

发布弹幕
主要逻辑
发布弹幕功能其实就很简单了,主要有两步:1、发布弹幕后前端立即渲染到屏幕上,2、写入数据库。直接上代码吧!
代码实现
controller层:
func (b *barragecontrollers) save() {
    uid, _ := b.getint("uid")
    content := b.getstring("content")
    currenttime, _ := b.getint("currenttime")
    episodesid, _ := b.getint("episodesid")
    videoid, _ := b.getint("videoid")
    if content == "" {
        b.data["json"] = returnerror(4001, "弹幕不能为空")
        b.servejson()
    }
    if uid == 0 {
        b.data["json"] = returnerror(4002, "请先登录")
        b.servejson()
    }
    if episodesid == 0 {
        b.data["json"] = returnerror(4003, "必须指定剧集id")
        b.servejson()
    }
    if videoid == 0 {
        b.data["json"] = returnerror(4005, "必须指定视频id")
        b.servejson()
    }
    if currenttime == 0 {
        b.data["json"] = returnerror(4006, "必须指定视频播放时间")
        b.servejson()
    }
    err := models.savebarrage(episodesid, videoid, currenttime, uid, content)
    if err == nil {
        b.data["json"] = returnsuccess(0, "success", "", 1)
        b.servejson()
    } else {
        b.data["json"] = returnerror(5000, err)
        b.servejson()
    }
}model层:
// savebarrage 保存弹幕
func savebarrage(episodesid int, videoid int, currenttime int, uid int, content string) error {
    o := orm.neworm()
    var barrage barrage
    barrage.content = content
    barrage.currenttime = currenttime
    barrage.addtime = time.now().unix()
    barrage.userid = uid
    barrage.status = 1
    barrage.episodesid = episodesid
    barrage.videoid = videoid
    _, err := o.insert(&barrage)
    return err
}router路由配置:
package routers
import (
    "fyoukuapi/controllers"
    "github.com/astaxie/beego"
)
// 路由配置
func init() {
    //弹幕功能
    beego.router("/barrage/ws", &controllers.barragecontrollers{}, "get:barragews")
  beego.router("/barrage/save", &controllers.barragecontrollers{}, "post:save")
}测试
使用api工具:
结果:
总结
文章到这里就简单的介绍结束了,实现一个弹幕功能并不是一项复杂的任务,但要确保其在用户体验、实时性和安全性方面都能达到最佳水平,需要综合考虑前后端的协同工作。通过本文的学习,相信你已经对实现弹幕功能有了更清晰的认识。
本作品采用《cc 协议》,转载必须注明作者和本文链接
 
 
: 1: : 1: : 1: