prefect是一个python的工作任务流调度实时框架,prefect可以快速构建平台系统复杂模块间工作流的监测。当平台系统模块之间的调用链越来越复杂时候,任务执行起来,已经很难盘点清楚程序逻辑和工作流运转的路线图。prefect通过简单的@注解,即可清晰在线、实时的梳理清楚系统运行“思维导图”。
prefect是github的开源项目:
GitHub - PrefectHQ/prefect: The easiest way to coordinate your dataflowThe easiest way to coordinate your dataflow. Contribute to PrefectHQ/prefect development by creating an account on GitHub.https://github.com/PrefectHQ/prefect
安装:
pip install -U prefect
@flow可以嵌套使用,@flow里面可以调用@task的任务。但@task不能反过调用@flow,原则上尽量使用@flow,@task是程序代码的最小原子单位。
import timefrom prefect import flow, task, get_run_logger@task(retries=2, retry_delay_seconds=30)
def failure():print('running')raise Exception('异常')@flow(name="my_flow_b", timeout_seconds=15)
def flowb(param: str):time.sleep(20)return 'OKb'@flow(name="my_flow_a")
def flowa(param: str):time.sleep(15)failure.submit()return 'OKa'@flow(name="my_flow_2", description='第二个工作流')
def flow2():logger = get_run_logger()logger.info('启动flow2')time.sleep(3)flowb('start b')return 'OK2'@flow(name="my_flow_1", description='第一个工作流')
def flow1():logger = get_run_logger()logger.info('启动flow1')time.sleep(2)flowa('start a')return 'OK1'if __name__ == '__main__':flow1()flow2()
程序运行后,在控制台使用命令启动prefect服务器端:
prefect orion start
默认prefect启动在http://127.0.0.1:4200,浏览器打开:
查看flow1运行时日志:
flowa中的实时log日志:
flowb的启动参数
可以在控制台查看prefect的配置项:
prefect config view
默认的prefect启动端口在4200,用户通过127.0.0.1:4200访问,可以通过配置设置新的启动端口:
prefect config set PREFECT_ORION_API_PORT=1234
指定prefect启动在1234端口。
然后重启服务器即可:
prefect orion start
prefect的后端服务器也可以通过REST API访问调用。prefect的文档可以直接查看:
http://127.0.0.1:4200/docs
http://127.0.0.1:4200/redoc
以上效果均在Windows环境下跑出来。prefect有更多的功能,值得研究和使用。