zlMediaKit 7 utils模块--ringbuffer发布订阅
创始人
2024-04-04 15:23:30
0

onceToken

利用变量生命周期执行传入的构造和析构,保证代码执行的先后顺序

class onceToken {
public:using task = std::function;templateonceToken(const FUNC &onConstructed, task onDestructed = nullptr) {onConstructed();_onDestructed = std::move(onDestructed);}onceToken(std::nullptr_t, task onDestructed = nullptr) {_onDestructed = std::move(onDestructed);}~onceToken() {if (_onDestructed) {_onDestructed();}}private:onceToken() = delete;onceToken(const onceToken &) = delete;onceToken(onceToken &&) = delete;onceToken &operator=(const onceToken &) = delete;onceToken &operator=(onceToken &&) = delete;private:task _onDestructed;
};

应用

socket->setOnErr([weak_self, weak_session, id](const SockException &err) {// 在本函数作用域结束时移除会话对象// 目的是确保移除会话前执行其 onError 函数// 同时避免其 onError 函数抛异常时没有移除会话对象onceToken token(nullptr, [&]() {// 移除掉会话auto strong_self = weak_self.lock();if (!strong_self) {return;}//从共享map中移除本session对象lock_guard lck(*strong_self->_session_mutex);strong_self->_session_map->erase(id);});// 获取会话强应用if (auto strong_session = weak_session.lock()) {// 触发 onError 事件回调strong_session->onError(err);}});

RingBuffer

RingDelegate

继承的子类需要实现onWrite函数

virtual void onWrite(T in, bool is_key = true) = 0;

_RingStorage

在这里插入图片描述

数据

using GopType = List< List > >;//链表中装着链表,次级链表是装着bool _have_idr;size_t _size;         //当前帧的数量 SUM(ervey gop size)size_t _max_size;     //最大记录帧数量 不能小于32。即最少最少也能存32帧size_t _max_gop_size; //最大gop个数(Group of Picture IBPBP)GopType _data_cache;  //链表套链表

构造

设置 _max_size _max_gop_size

clearCache清除主list,插入一个空的次级list

popFrontGop 弹出最先插入的GOP

    void clearCache() {_size = 0;_have_idr = false;_data_cache.clear();_data_cache.emplace_back();}void popFrontGop()

write

传过来的是I帧,就往新的GOP组里添加,

​ 传过来的是PB帧,

​ 之前的没有I帧则PB帧丢弃;

​ 之前已经有I帧到来,插入到对应的GOP组

如果帧总数超过 _max_size,先尝试清除老的GOP缓存,还是大于最大缓冲限制,那么清空所有GOP

/*** 写入环形缓存数据* @param in 数据* @param is_key 是否为关键帧* @return 是否触发重置环形缓存大小*/void write(T in, bool is_key = true) {if (is_key) {_have_idr = true;if (!_data_cache.back().empty()) {//当前gop列队还没收到任意缓存_data_cache.emplace_back();}if (_data_cache.size() > _max_gop_size) {// GOP个数超过限制,那么移除最早的GOPpopFrontGop();}}if (!_have_idr) {//缓存中没有关键帧,那么gop缓存无效return;}_data_cache.back().emplace_back(std::make_pair(is_key, std::move(in)));if (++_size > _max_size) {//GOP缓存溢出while (_data_cache.size() > 1) {//先尝试清除老的GOP缓存popFrontGop();}if (_size > _max_size) {//还是大于最大缓冲限制,那么清空所有GOPclearCache();}}}

List

继承了list,增加了append/for_each函数

template
class List : public std::list {
public:templateList(ARGS &&...args) : std::list(std::forward(args)...) {};~List() = default;void append(List &other) {if (other.empty()) {return;}this->insert(this->end(), other.begin(), other.end());other.clear();}templatevoid for_each(FUNC &&func) {for (auto &t : *this) {func(t);}}templatevoid for_each(FUNC &&func) const {for (auto &t : *this) {func(t);}}
};

_RingReader

环形缓存读取器,构造时即绑定了对应的_RingStorage

自己注册读取和detach函数,flushGop一次性读取所有帧

	std::function _detach_cb = []() {};std::function _read_cb = [](const T &) {};void flushGop() {if (!_storage) {return;}_storage->getCache().for_each([this](const List > &lst) {lst.for_each([this](const std::pair &pr) { onRead(pr.second, pr.first); });});}

_RingReaderDispatcher

环形事件派发器

	std::atomic_int _reader_size;                         //?std::function _on_size_changed;      //?typename RingStorage::Ptr _storage;                   //环形存储std::unordered_map > _reader_map;//本线程中,读取相同源的player的集合

析构:_reader_map中的所有RingReader调用onDetach

构造:private,初始化_storage,_reader_size,_on_size_changed

write

,调用_reader_map中的每个RingReader的onRead,然后往环形存储中写

rtp包来时,给每个拉流源调用onRead,同时往自己的_storage里写入缓存,当新的拉流源来时,根据useCache判断是否使用_storage

void write(T in, bool is_key = true) {for (auto it = _reader_map.begin(); it != _reader_map.end();) { auto reader = it->second.lock();if (!reader) {it = _reader_map.erase(it);--_reader_size;onSizeChanged(false);continue;}reader->onRead(in, is_key);++it;}_storage->write(std::move(in), is_key);
}

attach

附加到poller上,构造RingReader,_reader_map++。可以选择是否使用存储的cache。

std::shared_ptr attach(const EventPoller::Ptr &poller, bool use_cache) {if (!poller->isCurrentThread()) {throw std::runtime_error("必须在绑定的poller线程中执行attach操作");}std::weak_ptr<_RingReaderDispatcher> weakSelf = this->shared_from_this();auto on_dealloc = [weakSelf, poller](RingReader *ptr) {poller->async([weakSelf, ptr]() {auto strongSelf = weakSelf.lock();if (strongSelf && strongSelf->_reader_map.erase(ptr)) {--strongSelf->_reader_size;strongSelf->onSizeChanged(false);}delete ptr;});};std::shared_ptr reader(new RingReader(use_cache ? _storage : nullptr), on_dealloc);_reader_map[reader.get()] = reader;++_reader_size;onSizeChanged(true);return reader;
}

RingBuffer

    using Ptr = std::shared_ptr;using RingReader = _RingReader;                     using RingStorage = _RingStorage;using RingReaderDispatcher = _RingReaderDispatcher;using onReaderChanged = std::function; //std::mutex _mtx_map;                               //?std::atomic_int _total_count { 0 };                //?_total_count是正在拉流的客户端的个数typename RingStorage::Ptr _storage;                //环形缓存typename RingDelegate::Ptr _delegate;           //子类需要实现onWriteonReaderChanged _on_reader_changed;                //读取器变化的函数std::unordered_map _dispatcher_map;//不同的poller对应的不同的RingReaderDispatcher

构造

初始化RingStorage

attach

如果**_dispatcher_map中没有对应的poller键,new一个RingReaderDispatcher**,使用clone的RingStorage

write

有_delegate,_delegate->onWrite

否则 调用每个环形事件分发器中(其他线程)的write往自己线程的缓存写。往自己的线程中写

本线程的对应的pusher数据来了,往storage中写,同时调用各个线程的读poller去读这数据

void write(T in, bool is_key = true) {if (_delegate) {_delegate->onWrite(std::move(in), is_key);return;}LOCK_GUARD(_mtx_map);for (auto &pr : _dispatcher_map) {auto &second = pr.second;//切换线程后触发onRead事件pr.first->async([second, in, is_key]() { second->write(std::move(const_cast(in)), is_key); }, false);}_storage->write(std::move(in), is_key);}

发送包到player的调用栈

EventPoller::runLoop//
addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) //
pr.first->async([second, in, is_key]() { second->write(std::move(const_cast(in)), is_key); }, false);//
_RingReaderDispatcher::write --->reader->onRead(in, is_key);
_RingReader::onRead--->_read_cb(data);_play_reader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {auto strongSelf = weakSelf.lock();if (!strongSelf) {return;}strongSelf->sendRtpPacket(pack);//发送rtp包});

StrPrinter

一个可以支持<<的字符串

class _StrPrinter : public std::string {
public:_StrPrinter() {}template_StrPrinter& operator <<(T && data) {_stream << std::forward(data);this->std::string::operator=(_stream.str());return *this;}std::string operator <<(std::ostream&(*f)(std::ostream&)) const {return *this;}private:std::stringstream _stream;
};

EventDispatcher/NoticeCenter

NoticeCenter

结构

std::recursive_mutex _mtxListener;
std::unordered_map _mapListener;

private 获取和删除

    EventDispatcher::Ptr getDispatcher(const std::string &event, bool create = false) {std::lock_guard lck(_mtxListener);auto it = _mapListener.find(event);if (it != _mapListener.end()) {return it->second;}if (create) {//如果为空则创建一个EventDispatcher::Ptr dispatcher(new EventDispatcher());_mapListener.emplace(event, dispatcher);return dispatcher;}return nullptr;}void delDispatcher(const std::string &event, const EventDispatcher::Ptr &dispatcher) {std::lock_guard lck(_mtxListener);auto it = _mapListener.find(event);if (it != _mapListener.end() && dispatcher == it->second) {//两者相同则删除_mapListener.erase(it);}}

EventDispatcher

在这里插入图片描述

任意类型的FUNC

template

void addListener(void *tag, FUNC &&func) {

​ using funType = typename function_traits::stl_function_type;

​ std::shared_ptr pListener(new funType(std::forward(func)), [](void *ptr) {

​ funType *obj = (funType *) ptr;

​ delete obj;

​ });

多参数

template

int emitEvent(ArgsType &&…args) {

​ using funType = std::function(args))…)>;

​ funType *obj = (funType *) (pr.second.get());

​ (*obj)(std::forward(args)…);

class EventDispatcher {
public:friend class NoticeCenter;using Ptr = std::shared_ptr;~EventDispatcher() = default;private:using MapType = std::unordered_multimap >;EventDispatcher() = default;class InterruptException : public std::runtime_error {public:InterruptException() : std::runtime_error("InterruptException") {}~InterruptException() {}};templateint emitEvent(ArgsType &&...args) {using funType = std::function(args))...)>;decltype(_mapListener) copy;{//先拷贝(开销比较小),目的是防止在触发回调时还是上锁状态从而导致交叉互锁std::lock_guard lck(_mtxListener);copy = _mapListener;}int ret = 0;for (auto &pr : copy) {funType *obj = (funType *) (pr.second.get());try {(*obj)(std::forward(args)...);++ret;} catch (InterruptException &) {++ret;break;}}return ret;}templatevoid addListener(void *tag, FUNC &&func) {using funType = typename function_traits::type>::stl_function_type;std::shared_ptr pListener(new funType(std::forward(func)), [](void *ptr) {funType *obj = (funType *) ptr;delete obj;});std::lock_guard lck(_mtxListener);_mapListener.emplace(tag, pListener);}void delListener(void *tag, bool &empty) {std::lock_guard lck(_mtxListener);_mapListener.erase(tag);empty = _mapListener.empty();}private:std::recursive_mutex _mtxListener;MapType _mapListener;
};

如果用C

完美转发void*代替

锁+map用静态的map和vector代替(非多线程)

typedef int (*BG_RPC_MSG_HANDLER_FUNC)(void *rpc_msg);
std::map> event_handles;void event_register(int id, BG_RPC_MSG_HANDLER_FUNC func)
{auto& event_funcs = event_handles[id];for (const auto& each_func : event_funcs){if (each_func == func){error_log("id:%d already register!", id);return;}}event_handles[id].push_back(func);
}void event_call(int id, void *msg)
{auto it = event_handles.find(id);if (it == event_handles.end()) return;for (const auto &h : it->second){h(msg);}
}

总结

  • onceToken

  • EventDispatcher 模板编程,std::unordered_multimap<**void ***, std::shared_ptr >的使用; std::forward, ArgsType &&…args

  • 如何多线程处理一个rtp数据包,且不涉及到拷贝

    每个RingBuffer有对应的map [EventPoller::Ptr, typename RingReaderDispatcher] 加锁,给对应的RingReaderDispatcher发rtp包指针,RingReaderDispatcher再给_reader_map中的不同客户端(但是是相同的poller管理)发rtp包。

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...