【broadcast-service】一个轻量级Python发布订阅者框架
创始人
2024-02-09 17:24:49
0

本文节选至本人博客:https://www.blog.zeeland.cn/archives/broadcast-service-description

Introduction

前两天在Python最佳实践-构建自己的第三方库文章中介绍了自己构建的一个轻量级的Python发布订阅者框架,今天来简单介绍一下。

项目地址:

  • https://github.com/Undertone0809/broadcast-service
  • https://pypi.org/project/broadcast-service/

broadcast-service

broadcast-service是一个轻量级Python发布订阅者框架,事实上,我更愿意称其为广播模式,因为该框架更加的轻量级,通过broadcast-service,只需要引入一个单例类,你就可以十分轻松地构建起一个发布订阅者模式,几乎没有代码侵入性。

Install

执行以下语句即可安装该库。

pip install broadcast-service

Usage

下面是一个简单的发布订阅者实现:

from broadcast_service import broadcast_servicedef handle_msg():print('handle_msg callback')if __name__ == '__main__':# listen topicbroadcast_service.listen('Test', handle_msg)# publish broadcastbroadcast_service.broadcast('Test')

如果你想要在发布topic的时候传入对应的参数,你只需要将参数传入broadcast(topic_name, params) params中。

from broadcast_service import broadcast_servicedef handle_msg(params):print(params)if __name__ == '__main__':info = 'This is very important msg'# listen topicbroadcast_service.listen('Test', handle_msg)# publish broadcastbroadcast_service.broadcast('Test', info)
from broadcast_service import broadcast_servicedef handle_msg(info, info2):print(info)print(info2)if __name__ == '__main__':info = 'This is very important msg'info2 = 'This is also a very important msg.'# listen topicbroadcast_service.listen('Test', handle_msg)# publish broadcastbroadcast_service.broadcast('Test', info, info2)

此外,broadcast-service默认支持异步回调的方式,避免了现成堵塞的发生,下面是一个形象的demo演示。

    One day, leader Tom arrive the company but find not one staff in companybecause all staff are playing outside. Therefor, Tom send a messageeveryone must must return to company now. Staff Jack, Jasmine, Jane go backnow when they receive it. Actually, they need different time to go backin different places. When they return, they need to declare that they are back.---有一天,领导Tom来到公司,却发现公司里没有一个员工,因为所有的员工都在外面玩耍。于是,Tom发了一条信息,现在每个人都必须回到公司。Jack,Jasmine,Jane必须现在回去事实上,当他们收到信息时,他们正处在不同的地方,每个人回去需要花费的时间不同,并且回来后他们需要声明他们回来了。
from broadcast_service import broadcast_service
import time
import randomclass Application:"""This demo aim to show how to use broadcast-service.Scene:One day, leader Tom arrive the company but find not one staff in companybecause all staff are playing outside. Therefor, leader Tom send a messageeveryone must must return to company now. Staff Jack, Jasmine, Jane go backnow when they receive it. Actually, they need different time to go backin different places. When they return, they need to declare that they are back.Attention:broadcast-service is asynchronous by defalut. If you want to close the asyncstate. You can use:broadcast_service.enable_async = Falseto close the async statement."""def __init__(self):self.leader = Leader('Tom')self.staff1 = Staff('Jack')self.staff2 = Staff('Jasmine')self.staff3 = Staff('Jane')self.current_time = Nonedef run(self):self.current_time = time.time()self.leader.notice_go_back()def print_time():return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))class Leader:def __init__(self, name):self.name = namedef notice_go_back(self):print('[{1}] {0}(leader) notice meeting now.'.format(self.name, print_time()))meeting_info = 'You guys must go back now!'broadcast_service.broadcast('meeting', meeting_info)class Staff:def __init__(self, name):self.name = nameself.rec_msg()def rec_msg(self):broadcast_service.listen('meeting', self.go_back)def go_back(self, info):print("[{2}] {0}(staff) receive msg '{1}' and go back now.".format(self.name, info, print_time()))time.sleep(random.randint(1, 5))print('[{1}] {0}(staff) is back now.'.format(self.name, print_time()))if __name__ == '__main__':app = Application()app.run()

事实上,从如下结果可以看到,该主体发布时,订阅者可以同时响应订阅的主题,而不会产生线程堵塞的问题。

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...