糖尿病康复,内容丰富有趣,生活中的好帮手!
糖尿病康复 > 工作流调度系统Apache DolphinScheduler介绍和设计原理

工作流调度系统Apache DolphinScheduler介绍和设计原理

时间:2019-02-20 13:34:45

相关推荐

工作流调度系统Apache DolphinScheduler介绍和设计原理

1 概述

Apache DolphinScheduler(目前处在孵化阶段)是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,其致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用。

DolphinScheduler是今年()中国易观公司开源的一个调度系统,在今年美国时间8月29号,易观开源的分布式任务调度引擎DolphinScheduler(原EasyScheduler)正式通过顶级开源组织Apache基金会的投票决议,根据Apache基金会邮件列表显示,在包含11个约束性投票(binding votes)和2个无约束性投票(non-binding votes)的投票全部持赞同意见,无弃权票和反对票,投票顺利通过,这样便以全票通过的优秀表现正式成为了Apache孵化器项目!

1.1 背景

在,易观在运营自己6.8Pb大小、6.02亿月活、每天近万个调度任务的大数据平台时,受到ETL复杂的依赖关系、平台易用性、可维护性及二次开发等方面掣肘,易观的技术团队渴望找到一个具有以下功能的数据调度工具:

易于使用,开发人员可以通过非常简单的拖拽操作构建ETL过程。不仅对于ETL开发人员,无法编写代码的人也可以使用此工具进行ETL操作,例如系统管理员和分析师;解决“复杂任务依赖”问题,并且可以实时监视ETL运行状态;支持多租户;支持许多任务类型:Shell,MR,Spark,SQL(mysql,postgresql,hive,sparksql),Python,Sub_Process,Procedure等;支持HA和线性可扩展性。

易观技术团队意识到现有开源项目没有能够达到他们要求的,因此决定自行开发这个工具。他们在底设计了DolphinScheduler的主要架构;5月完成第一个内部使用版本,后来又迭代了几个内部版本后,系统逐渐稳定下来。

这里介绍一下DolphinScheduler易观技术团队,他们是一支来自百度、阿里、百分点、Ptmind、热云等团队的“数据极客”,秉持易观“让数据能力平民化”的初心,积极拥抱开源,曾贡献过Presto Hbase Connector,Presto Kudu Connector等令开发者称赞的项目。这次他们在公司的支持下,积极地将自己开发的调度工具推动开源,旨在回馈开源的同时,助力打造一个更为强大的开源生态。如果跃跃欲试的想去贡献代码的,贡献流程可以参考这篇博客:分布式任务调度EasyScheduler贡献代码流程。

团队在3月初,小范围(10多家公司)开放了DS的种子用户试用,得到了非常正能量的反馈,在4月初的正式对外开放源码后,很快就获得了许多开发人员的关注兴趣,目前github上的star现在已超过1700个,参与开发和使用的公司包括嘀嗒出行、雪球、凤凰金融、水滴互助、华润万家等,更详细的可以查看:Wanted: Who is using DolphinScheduler #57。

1.2 特点

DolphinScheduler提供了许多易于使用的功能,可加快数据ETL工作开发流程的效率。其主要特点如下:

通过拖拽以DAG 图的方式将 Task 按照任务的依赖关系关联起来,可实时可视化监控任务的运行状态;支持丰富的任务类型;支持工作流定时调度、依赖调度、手动调度、手动暂停/停止/恢复,同时支持失败重试/告警、从指定节点恢复失败、Kill 任务等操作;支持工作流全局参数及节点自定义参数设置;支持集群HA,通过 Zookeeper实现 Master 集群和 Worker 集群去中心化;支持工作流运行历史树形/甘特图展示、支持任务状态统计、流程状态统计;支持补数,并行或串行回填数据。

2 系统架构

2.1 名词解释

流程定义:通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG流程实例:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,产生一个流程实例任务实例:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态任务类型: 目前支持有SHELL、SQL、SUB_PROCESS(子流程)、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT(依赖),同时计划支持动态插件扩展,注意:其中子 SUB_PROCESS 也是一个单独的流程定义,是可以单独启动执行的调度方式: 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、停止、恢复等待线程。其中 恢复被容错的工作流 和 恢复等待线程 两种命令类型是由调度内部控制使用,外部无法调用定时调度:系统采用 quartz 分布式调度器,并同时支持cron表达式可视化的生成依赖:系统不单单支持 DAG 简单的前驱和后继节点之间的依赖,同时还提供任务依赖节点,支持流程间的自定义任务依赖优先级:支持流程实例和任务实例的优先级,如果流程实例和任务实例的优先级不设置,则默认是先进先出邮件告警:支持 SQL任务 查询结果邮件发送,流程实例运行结果邮件告警及容错告警通知失败策略:对于并行运行的任务,如果有任务失败,提供两种失败策略处理方式,继续是指不管并行运行任务的状态,直到流程失败结束。结束是指一旦发现失败任务,则同时Kill掉正在运行的并行任务,流程失败结束补数:补历史数据,支持区间并行和串行两种补数方式

2.2 架构

关于更详细的系统架构设计可以查看官方提供的刘小春(xiaochun.liu)一篇博客DolphinScheduler系统架构设计。

3 部署

3.1 后端部署

后端有2种部署方式,分别为自动化部署和编译源码部署。下面主要介绍下载编译后的二进制包一键自动化部署的方式完成DolphinScheduler后端部署。

3.1.1 基础软件安装

Mysql (5.5+) : 必装JDK (1.8+) : 必装ZooKeeper(3.4.6+) :必装Hadoop(2.6+) :选装, 如果需要使用到资源上传功能,MapReduce任务提交则需要配置Hadoop(上传的资源文件目前保存在Hdfs上)Hive(1.2.1) : 选装,hive任务提交需要安装Spark(1.x,2.x) : 选装,Spark任务提交需要安装PostgreSQL(8.2.15+) : 选装,PostgreSQL PostgreSQL存储过程需要安装

编译时

如果是编译源码

Node.js:毕装Maven:毕装,最好 3.6 版本

注意:EasyScheduler本身不依赖Hadoop、Hive、Spark、PostgreSQL,仅是会调用他们的Client,用于对应任务的运行。

3.1.2 创建部署用户

在所有需要部署调度的机器上创建部署用户(本次以node2、node3节点为例),因为worker服务是以 sudo -u {linux-user} 方式来执行作业,所以部署用户需要有 sudo 权限,而且是免密的。

# 1 创建用户useradd escheduler# 2 设置 escheduler 用户密码passwd escheduler# 3 赋予sudo权限。编辑系统 sudoers 文件# 如果没有编辑权限,以root用户登录,赋予w权限# chmod 640 /etc/sudoersvi /etc/sudoers# 大概在100行,在root下添加如下escheduler ALL=(ALL) NOPASSWD: NOPASSWD: ALL# 并且需要注释掉 Default requiretty 一行。如果有则注释,没有没有跳过#Default requiretty########### end ############# 4 切换到 escheduler 用户su escheduler

3.1.3 下载并解压

# 1 创建安装目录sudo mkdir /opt/DolphinScheduler# 2 将DolphinScheduler赋予给escheduler用户sudo chown -R escheduler:escheduler /opt/DolphinScheduler# 3 下载后端。简称escheduler-backendcd /opt/DolphinSchedulerwget /apache/incubator-dolphinscheduler/releases/download/1.1.0/escheduler-1.1.0-backend.tar.gz# 4 解压mkdir escheduler-backendmkdir eschedulertar -zxf escheduler-1.1.0-backend.tar.gz -C eschedulercd escheduler/# 5 目录介绍[escheduler@node2 escheduler]$ tree -L 1.├── bin # 基础服务启动脚本├── conf# 项目配置文件├── install.sh # 一键部署脚本├── lib # 项目依赖jar包,包括各个模块jar和第三方jar├── script # 集群启动、停止和服务监控启停脚本└── sql # 项目依赖sql文件5 directories, 1 file

3.1.4 针对escheduler用户ssh免密配置

# 1 配置SSH免密# 1.1 node2 节点执行# 有提示直接回车ssh-keygen -t rsa# 拷贝到node2和node3。提示输入密码时,输入 escheduler 用户的密码ssh-copy-id -i ~/.ssh/id_rsa.pub escheduler@node2ssh-copy-id -i ~/.ssh/id_rsa.pub escheduler@node3# 1.2 node3 节点执行# 有提示直接回车ssh-keygen -t rsa# 拷贝到node2和node3。提示输入密码时,输入 escheduler 用户的密码ssh-copy-id -i ~/.ssh/id_rsa.pub escheduler@node2ssh-copy-id -i ~/.ssh/id_rsa.pub escheduler@node3

3.1.5 数据库初始化

执行以下命令创建数据库和账号

CREATE DATABASE escheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;-- 设置数据用户escheduler的访问密码为 escheduler,并且不对访问的ip做限制-- 测试环境将访问设置为所有,如果是生产,可以限制只能子网段的ip才能访问('198.168.33.%')GRANT ALL PRIVILEGES ON escheduler.* TO 'escheduler'@'%' IDENTIFIED BY 'escheduler';flush privileges;

创建表和导入基础数据 修改vim /opt/DolphinScheduler/escheduler/conf/dao/data_source.properties中的下列属性

# 大概在第 4 行修改MySQL数据库的urlspring.datasource.url=jdbc:mysql://node1:3306/escheduler?characterEncoding=UTF-8# 用户名。spring.datasource.username=escheduler# 密码。填入上一步IDENTIFIED BY 后面设置的密码spring.datasource.password=escheduler

执行创建表和导入基础数据脚本

# 前面已进入/opt/DolphinScheduler/escheduler-backend目录下,然后执行数据初始化脚本# 最后看到 create escheduler success 表示数据库初始化成功sh ./script/create_escheduler.sh

3.1.6 修改部署目录权限及运行参数

# 1 修改conf/env/目录下的 .escheduler_env.sh 环境变量vim conf/env/.escheduler_env.sh# 将对应的修改为自己的组件或框架的路径export HADOOP_HOME=/opt/hadoop-3.1.2export HADOOP_CONF_DIR=/opt/hadoop-3.1.2/etc/hadoopexport SPARK_HOME1=/opt/spark-2.3.4-bin-hadoop2.7#export SPARK_HOME2=/opt/soft/spark2#export PYTHON_HOME=/opt/soft/pythonexport JAVA_HOME=/usr/local/zulu8/export HIVE_HOME=/opt/apache-hive-3.1.1-bin#export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATHexport PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH# ==========# CDH 版# ==========#export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop#export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.yarn#export SPARK_HOME1=/opt/cloudera/parcels/CDH/lib/spark##export SPARK_HOME2=/opt/soft/spark2##export PYTHON_HOME=/opt/soft/python#export JAVA_HOME=/usr/local/zulu8/#export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive##export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH#export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH

修改install.sh中的各参数,替换成自身业务所需的值,这里只列出了重要的修改项,其它默认不用改即可。

# mysql配置# mysql 地址,端口mysqlHost="192.168.33.3:3306"# mysql 数据库名称mysqlDb="escheduler"# mysql 用户名mysqlUserName="escheduler"# mysql 密码# 注意:如果有特殊字符,请用 \ 转移符进行转移mysqlPassword="escheduler"# conf/config/install_config.conf配置# 注意:安装路径,不要当前路径(pwd)一样。一键部署脚本分发到其它节点时的安装路径installPath="/opt/DolphinScheduler/escheduler-backend"# 部署用户# 注意:部署用户需要有sudo权限及操作hdfs的权限,如果开启hdfs,根目录需要自行创建deployUser="escheduler"# zk集群zkQuorum="192.168.33.3:2181,192.168.33.6:2181,192.168.33.9:2181"# 安装hosts# 注意:安装调度的机器hostname列表,如果是伪分布式,则只需写一个伪分布式hostname即可ips="192.168.33.6,192.168.33.9"# conf/config/run_config.conf配置# 运行Master的机器# 注意:部署master的机器hostname列表masters="192.168.33.6"# 运行Worker的机器# 注意:部署worker的机器hostname列表workers="192.168.33.6,192.168.33.9"# 运行Alert的机器# 注意:部署alert server的机器hostname列表alertServer="192.168.33.6"# 运行Api的机器# 注意:部署api server的机器hostname列表apiServers="192.168.33.6"# 用到邮箱发送邮件时务必配置上邮件服务,否则执行结果发送时会提示失败# cn.escheduler.server.worker.runner.TaskScheduleThread:[249] - task escheduler # failure : send mail failed!java.lang.RuntimeException: send mail failed!# alert配置# 邮件协议,默认是SMTP邮件协议mailProtocol="SMTP"# 邮件服务host。以网易邮箱为例。QQ邮箱的服务为 mailServerHost=""# 邮件服务端口。SSL协议端口 465/994,非SSL协议端口 25mailServerPort="465"# 发送人。# 网易邮箱在 客户端授权密码 获取,具体可以看下图mailSender="*******yore@"# 发送人密码mailPassword="yore***"# 下载Excel路径xlsFilePath="/home/escheduler/xls"#是否启动监控自启动脚本# 开关变量,在1.0.3版本中增加,控制是否启动自启动脚本(监控master,worker状态,如果掉线会自动启动) # 默认值为"false"表示不启动自启动脚本,如果需要启动改为"true"monitorServerState="true"# 资源中心上传选择存储方式:HDFS,S3,NONEresUploadStartupType="HDFS"# 如果resUploadStartupType为HDFS,defaultFS写namenode地址,支持HA,需要将core-site.xml和hdfs-site.xml放到conf目录下# 如果是S3,则写S3地址,比如说:s3a://escheduler,注意,一定要创建根目录/eschedulerdefaultFS="hdfs://192.168.33.3:8020"# resourcemanager HA配置,如果是单resourcemanager,这里为yarnHaIps=""yarnHaIps="192.168.33.3"# 如果是单 resourcemanager,只需要配置一个主机名称,如果是resourcemanager HA,则默认配置就好singleYarnIp="192.168.33.3"# common 配置# 程序路径programPath="/opt/DolphinScheduler/escheduler-backend"#下载路径downloadPath="/tmp/escheduler/download"# 任务执行路径execPath="/tmp/escheduler/exec"# SHELL环境变量路径shellEnvPath="$installPath/conf/env/.escheduler_env.sh"# 资源文件的后缀resSuffixs="txt,log,sh,conf,cfg,py,java,sql,hql,xml"# api 配置# api 服务端口apiServerPort="12345"

如果使用hdfs相关功能,需要拷贝hdfs-site.xml和core-site.xml到conf目录下

cp $HADOOP_HOME/etc/hadoop/hdfs-site.xml conf/cp $HADOOP_HOME/etc/hadoop/core-site.xml conf/

网易云邮箱服务客户端用户名和密码获取,开启客户端授权码,并获取。

如果 DolphinScheduler 已经安装,则可以通过设置部署的后端服务下的conf/alert.properties文件

#alert type is EMAIL/SMSalert.type=EMAIL# mail server configurationmail.protocol=SMTP# 以网易邮箱为例mail.server.host=# SSL协议端口 465/994,非SSL协议端口 25mail.server.port=465mail.sender=*******yore@mail.passwd=yore***# TLSmail.smtp.starttls.enable=false# SSLmail.smtp.ssl.enable=true#xls.file.path=/home/escheduler/xlsxls.file.path=/home/escheduler/xls# Enterprise WeChat configurationenterprise.wechat.corp.id=xxxxxxxxxxenterprise.wechat.secret=xxxxxxxxxxenterprise.wechat.agent.id=xxxxxxxxxxenterprise.wechat.users=xxxxx,xxxxxenterprise.wechat.token.url=https://qyapi./cgi-bin/gettoken?corpid=$corpId&corpsecret=$secretenterprise.wechat.push.url=https://qyapi./cgi-bin/message/send?access_token=$tokenenterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}}

3.1.7 执行脚本一键部署

# 1 一键部署并启动sh install.sh# 2 查看日志[escheduler@node2 escheduler-backend]$ tree /opt/DolphinScheduler/escheduler-backend/logs/opt/DolphinScheduler/escheduler-backend/logs├── escheduler-alert.log├── escheduler-alert-server-node-.out├── escheduler-alert-server.pid├── escheduler-api-server-node-.out├── escheduler-api-server.log├── escheduler-api-server.pid├── escheduler-logger-server-node-.out├── escheduler-logger-server.pid├── escheduler-master.log├── escheduler-master-server-node-.out├── escheduler-master-server.pid├── escheduler-worker.log├── escheduler-worker-server-node-.out├── escheduler-worker-server.pid└── {processDefinitionId}└── {processInstanceId}└── {taskInstanceId}.log# 3 查看Java进程# 3.1 node2# jps -l | grep escheduler[escheduler@node2 escheduler-backend]$ jps31651 WorkerServer # worker服务31784 ApiApplicationServer# api服务31609 MasterServer # master服务31743 AlertServer# alert服务31695 LoggerServer # logger服务# 3.2 node3[escheduler@cdh3 DolphinScheduler]$ jps26678 WorkerServer26718 LoggerServer

错误1:如果查看/opt/DolphinScheduler/escheduler-backend/logs/escheduler-api-server-*.out日志报如下错误

nohup: failed to run command ‘/bin/java’: No such file or directory

解决将JAVA_HOME/bin下的java软连接到/bin下。(每个dolphinscheduler节点都执行)3.1.8 服务进程的说明

由前面我们可以看到,后端服务正常启动后,共有 5 个进程:WorkerServerApiApplicationServerMasterServerAlertServerLoggerServer。另外还有一个 UI,具体说明如下:

3.1.9 dolphinscheduler后端服务启停

# 启动/opt/DolphinScheduler/escheduler-backend/script/start_all.sh# 停止/opt/DolphinScheduler/escheduler-backend/script/stop_all.sh

3.2 前端部署

前端有3种部署方式,分别为自动化部署,手动部署和编译源码部署。这里主要使用自动化脚本方式部署DolphinScheduler前端服务。

3.2.1 下载并解压

# 1 下载 UI 前端。简称escheduler-ui# 在node2节点下的 /opt/DolphinScheduler wget /apache/incubator-dolphinscheduler/releases/download/1.1.0/escheduler-1.1.0-ui.tar.gz# 2 解压mkdir escheduler-uitar -zxf escheduler-1.1.0-ui.tar.gz -C escheduler-uicd escheduler-ui

3.2.2 执行自动化部署脚本

执行自动化部署脚本。脚本会提示一些参数,根据提示完成安装。

[escheduler@cdh2 escheduler-ui]$ sudo ./install-escheduler-ui.sh欢迎使用easy scheduler前端部署脚本,目前前端部署脚本仅支持CentOS,Ubuntu请在 escheduler-ui 目录下执行linux请输入nginx代理端口,不输入,则默认8888 :8888请输入api server代理ip,必须输入,例如:192.168.xx.xx :192.168.33.6请输入api server代理端口,不输入,则默认12345 :12345=================================================1.CentOS6安装2.CentOS7安装3.Ubuntu安装4.退出=================================================请输入安装编号(1|2|3|4):2…… Complete!port option is needed for addFirewallD is not runningsetenforce: SELinux is disabled请浏览器访问:http://192.168.33.6:8888

使用自动化部署脚本会检查系统环境是否安装了Nginx,如果没有安装则会通过网络自动下载Nginx包安装,通过引导设置后的Nginx配置文件为/etc/nginx/conf.d/escheduler.conf。但生产环境一般法法访问外网,此时可以通过手动离线安装Nginx,然后进行一些配置即可。

# 1 下载 Nginx 离线安装包# 例如下载 Cento7 CPU指令为 x86版本的 wget /packages/mainline/centos/7/x86_64/RPMS/nginx-1.17.6-1.el7.ngx.x86_64.rpm# 2 安装rpm -ivh nginx-1.17.6-1.el7.ngx.x86_64.rpm

下面在手动再Nginx中添加一个DolphinSchedule 服务配置。因为在/etc/nginx/nginx.conf(Nginx默认加载的配置文件)中有include /etc/nginx/conf.d/*.conf ;,所以我们可以在/etc/nginx/conf.d/下创建一个conf后缀的配置文件,配置文件的文件名随意,例如叫escheduler.conf。这里需要特别注意的是在/etc/nginx/nginx.conf配置文件中前面有一个配置user nginx如果启动Nginx的用户不是 nginx,一定要修改为启动Nginx的用户,否则代理的服务会报 403 的错误。这里我们在/etc/nginx/conf.d/escheduler.conf配置如下内容,重点在 server 中配置listen(DolphinSchedule Web UI 的端口)、**root **(解压的escheduler-ui 中的 dist 路径 )、proxy_pass(DolphinSchedule后台接口的地址)等信息。最后重启Nginx执行命令systemctl restart nginx

server {listen 8888; # 访问端口server_name localhost;#charset koi8-r;#access_log /var/log/nginx/host.access.log main;location / {root /opt/DolphinScheduler/escheduler-ui/dist; # 上面前端解压的dist目录地址(自行修改)index index.html index.html;}location /escheduler {proxy_pass http://192.168.33.6:12345; # 接口地址(自行修改)proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header x_real_ipP $remote_addr;proxy_set_header remote_addr $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_http_version 1.1;proxy_connect_timeout 4s;proxy_read_timeout 30s;proxy_send_timeout 12s;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}#error_page 404 /404.html;# redirect server error pages to the static page /50x.html#error_page 500 502 503 504 /50x.html;location = /50x.html {root /usr/share/nginx/html;}}

传文件大小限制

编辑配置文件vim /etc/nginx/nginx.conf

# 更改上传大小client_max_body_size 1024m

3.2.3 dolphinscheduler前端服务启停

# 1 启动systemctl start nginx# 2 状态systemctl status nginx# 3 停止#nginx -s stopsystemctl stop nginx

4 快速开始

浏览器访问http://192.168.33.6:8888,如下图所示。

在上述登陆页面默认的账户的用户名为 admin 密码为escheduler123,这个账户也是系统默认的管理员账户,登陆成功后可以修改密码。成功登陆有主页面如下所示

创建一个队列。队列管理 -> 创建队列 -> 输入名称和队列值 -> 提交。

创建租户。租户管理 -> 创建租户 -> 输入租户编码、租户名称和队列值 -> 提交。

创建普通用户。用户管理 -> 创建用户 -> 输入用户名称、密码、租户名和邮箱,手机号选填 -> 提交。

创建警告组。警告组管理 -> 创建警告组 -> 输入组名称、组类型(邮件、短信)-> 提交。

使用普通用户登录(用户名和密码都是demo)。点击右上角用户名“退出”,重新使用普通用户登录。登陆成功的首页如下。

创建一个项目。点击页面头部的项目管理,进入项目页面,再点击创建项目,创建一个DolphinScheduler任务调度项目,在弹出的框中输入项目名称和描述,例如这里创建一个hello_dolphinScheduler名称的项目,最后点击提交。

项目创建完毕后,在项目管理页面点击我们创建的项目,进入该项目的管理页面。点击工作流定义 -> 创建工作流 -> 在左侧工具栏可以选择(SHELL、USB_PROCESS、PROCEDURE、SQL、SPARK、MapReduce、PYTHON、DEPENDENT)。拖拽SHELL节点到画布,新增一个Shell任务,填写节点名称描述脚本字段;选择任务优先级,级别高的任务在执行队列中会优先执行,相同优先级的任务按照先进先出的顺序执行;超时告警, 填写超时时长,当任务执行时间超过超时时长可以告警并且超时失败。(注意:这里的节点不是机器的节点,而应该是工作流的节点)

确认修改完毕后,点击保存,此时设置DAG图名称,选择组租户,最后添加。

未上线状态的工作流定义可以编辑,但是不可以运行,所以要执行工作流,需要先上线工作流

点击”运行“,执行工作流。运行参数说明:

失败策略:当某一个任务节点执行失败时,其他并行的任务节点需要执行的策略。”继续“表示:其他任务节点正常执行,”结束“表示:终止所有正在执行的任务,并终止整个流程。通知策略:当流程结束,根据流程状态发送流程执行信息通知邮件。流程优先级:流程运行的优先级,分五个等级:最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。worker分组: 这个流程只能在指定的机器组里执行。默认是Default,可以在任一worker上执行。通知组: 当流程结束,或者发生容错时,会发送流程信息邮件到通知组里所有成员。收件人:输入邮箱后按回车键保存。当流程结束、发生容错时,会发送告警邮件到收件人列表。抄送人:输入邮箱后按回车键保存。当流程结束、发生容错时,会抄送告警邮件到抄送人列表。

点击任务实例可以查看每个任务的列表信息,点击操作栏,可以看到任务执行的日志信息。

5 Worker分组和数据源添加

worker分组,提供了一种让任务在指定的worker上运行的机制。管理员创建worker分组,在任务节点和运行参数中设置中可以指定该任务运行的worker分组,如果指定的分组被删除或者没有指定分组,则该任务会在任一worker上运行。worker分组内多个ip地址(不能写别名),以英文逗号分隔。

用管理员用户(admin)登陆Web页面,点击安全中心->Worker分组管理,如下图所示。

创建Worker分组。填写组名称和IP,IP可以是多个,用英文逗号分割即可。

例如下图,我们将Worker的IP分为了两组。

6 添加数据源

脚本(一般是SQL脚本)执行时可能会用到一些数据源,例如MySQL、PostgreSQL、Hive、Impala、Spark、ClickHouse、Oracle、SQL Server,通过添加数据源在DolphinScheduler页面编写Job时直接选择,不用再指定驱动、连接、用户名和密码等信息,可以快速创建一个SQL脚本的工作流Job,同时这个数据源时用户隔离的,每个用户添加的数据源相互独立(admin用户除外,管理员用户可以看到所有用户添加的数据源)。

下面我们以Impala为例,选择页面头部的数据源中心->添加数据源,会弹出下图编辑数据源弹窗,主要填写如下几项。因为Impala没有设置密码,用户为必填可以任意添加一个,在jdbc连接参数中必须添加{"auth":"noSasl"}参数,否则会一直等待确认认证。

其它数据源类似,例如我们添加如下几个数据源,后面会用到ClickHouse(详见我的另一篇博客ClickHouse的安装(含集群方式)和使用)。

7 实例

在项目管理下,点击工作流定义,在工具栏处选择最后一行的DEPENDENT定义一个带依赖的工作流Job,拖动到编辑面板,设置task的节点名为cdh2-task1,在Worker分组中选择执行的Worker节点为cdh2,编辑完这个Task后选择确认添加。选择执行的Worker分组名,这里选择前面设置的cdh2组,确认添加,如下图所示。同样的方式设置第二个依赖Task,将其Worker分组设置到cdh3节点,并添加依赖为

接下来设置两个Shell执行脚本,cdh2-task11上执行task11,主要是在cdh2上执行一个hostname命令,打印执行节点的HostName。同样的方式,在依赖节点cdh3-task21上设置在cdh3执行,也是执行hostname命令。最后再在依赖节点cdh3-task21上添加一个SQL脚本,查询我们的豆瓣电影数据,具体操作如下

在工具栏拖拽添加一个SQL脚本Task节点;节点名称可以叫:ck-task01,并添加描述信息;Worker分组:cdh3;数据源:CLICKHOUSEclickhouse-cdh3;sql类型选择查询表格;邮件信息:填写主题收件人邮箱、抄送人邮箱;sql语句:

SELECT m.id,m.movie_name,m.rating_num,m.rating_people,q.rank,q.quote FROM movie mLEFT JOIN quote qON q.id=m.idORDER BY m.rating_num DESC,m.rating_people DESC LIMIT 10;

1234

各个task编写完毕后,选择右上角的 选择线条连接,工作流编写完毕后如下图,最后点击保存,输入DAG图名称,并选择租户,选择添加保存。

回到工作流定义,可以看到新添加的当前用户的所有工作流列表,点击右侧的操作栏的上线,然后点击运行执行我们的工作流。当然这里也可以添加定时调度。

点击运行后,可以在工作流实例页面看到当前运行的Job的状态信息。每个工作可能会有多个Task构成,查看Task的执行信息可以在 任务实例 页面查看,操作栏可以查看这个task的执行日志信息。如果执行成功后,可以选择工作流的甘特图,在时间轴上查看执行状况。

也可以查看工作流的执行的树形图信息,如下图。

运行成功后填写的收件箱会接收到执行结果的一封邮件,这封邮件中包含了脚本执行的结果。

8 与 Azkaban 的对比

9小节

Apache DolphinScheduler是一个分布式、去中心化、易扩展的可视化DAG工作流任务调度系统,从上面的安装可以看到这个调度系统集成了ZooKeeper,很好的实现了去中心化,每个角色的服务可以起多个,从znode上可以看到mastersworkers的一些元信息都注册在了上面,交由ZK去选举,当然它也是一个分布式的。如果某个服务挂了,ZooKeeper会在剩下的其它节点进行选举,例如当某些节点的Worker服务挂了,我们不用做任何处理,DolphinScheduler上依然可以正常提交和执行工作,在它的监控中心的页面可以看到,系统自动选举出了一个新的Work节点。

# znode上的信息[zk: localhost:2181(CONNECTED) 1] ls /escheduler[tasks_queue, dead-servers, masters, lock, workers, tasks_kill]

123

尤其可以多Worker进行分组以及添加数据源的功能,可以指定Wroker节点,直接指定改用户下的数据,执行SQL脚本,同时页面增加的监控中心、任务状态统计、流程状态统计、流程定义统计等也能很好的帮助我们管理和查看任务执行的信息和集群的状态。

系统架构设计

在对调度系统架构说明之前,我们先来认识一下调度系统常用的名词

1.名词解释

DAG:全称Directed Acyclic Graph,简称DAG。工作流中的Task任务以有向无环图的形式组装起来,从入度为零的节点进行拓扑遍历,直到无后继节点为止。举例如下图:

dag示例

流程定义:通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG

流程实例:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,产生一个流程实例

任务实例:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态

任务类型: 目前支持有SHELL、SQL、SUB_PROCESS(子流程)、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT(依赖),同时计划支持动态插件扩展,注意:其中子SUB_PROCESS也是一个单独的流程定义,是可以单独启动执行的

调度方式:系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、停止、恢复等待线程。其中恢复被容错的工作流恢复等待线程两种命令类型是由调度内部控制使用,外部无法调用

定时调度:系统采用quartz分布式调度器,并同时支持cron表达式可视化的生成

依赖:系统不单单支持DAG简单的前驱和后继节点之间的依赖,同时还提供任务依赖节点,支持流程间的自定义任务依赖

优先级:支持流程实例和任务实例的优先级,如果流程实例和任务实例的优先级不设置,则默认是先进先出

邮件告警:支持SQL任务查询结果邮件发送,流程实例运行结果邮件告警及容错告警通知

失败策略:对于并行运行的任务,如果有任务失败,提供两种失败策略处理方式,继续是指不管并行运行任务的状态,直到流程失败结束。结束是指一旦发现失败任务,则同时Kill掉正在运行的并行任务,流程失败结束

补数:补历史数据,支持区间并行和串行两种补数方式

2.系统架构

2.1 系统架构图

系统架构图

2.2 架构说明

MasterServer

MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。

该服务内主要包含:

Distributed Quartz分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作

MasterSchedulerThread是一个扫描线程,定时扫描数据库中的command表,根据不同的命令类型进行不同的业务操作

MasterExecThread主要是负责DAG任务切分、任务提交监控、各种不同命令类型的逻辑处理

MasterTaskExecThread主要负责任务的持久化

WorkerServer

WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。

该服务包含:

FetchTaskThread主要负责不断从Task Queue中领取任务,并根据不同任务类型调用TaskScheduleThread对应执行器。

LoggerServer是一个RPC服务,提供日志分片查看、刷新和下载等功能

ZooKeeper

ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。 我们也曾经基于Redis实现过队列,不过我们希望EasyScheduler依赖到的组件尽量地少,所以最后还是去掉了Redis实现。

Task Queue

提供任务队列的操作,目前队列也是基于Zookeeper来实现。由于队列中存的信息较少,不必担心队列里数据过多的情况,实际上我们压测过百万级数据存队列,对系统稳定性和性能没影响。

Alert

提供告警相关接口,接口主要包括告警两种类型的告警数据的存储、查询和通知功能。其中通知功能又有邮件通知和**SNMP(暂未实现)**两种。

API

API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。

UI

系统的前端页面,提供系统的各种可视化操作界面,详见**系统使用手册**部分。

2.3 架构设计思想

一、去中心化vs中心化

中心化思想

中心化的设计理念比较简单,分布式集群中的节点按照角色分工,大体上分为两种角色:

Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。Worker的角色主要负责任务的执行工作并维护和Master的心跳,以便Master可以分配任务给Slave。

中心化思想设计存在的问题:

一旦Master出现了问题,则群龙无首,整个集群就会崩溃。为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。另外一个问题是如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,则一个DAG中所有的任务都只能在某一台机器上进行作业提交,则并行任务比较多的时候,Slave的压力可能会比较大。

去中心化

在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,全球互联网就是一个典型的去中心化的分布式系统,联网的任意节点设备down机,都只会影响很小范围的功能。

去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的”管理者”,因此不存在单点故障问题。但由于不存在” 管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠行,则大大增加了上述功能的实现难度。

实际上,真正去中心化的分布式系统并不多见。反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行"会议"来选举新的"管理者"去主持工作。最典型的案例就是ZooKeeper及Go语言实现的Etcd。

EasyScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务。

二、分布式锁实践

EasyScheduler使用ZooKeeper分布式锁来实现同一时刻只有一台Master执行Scheduler,或者只有一台Worker执行任务的提交。

获取分布式锁的核心流程算法如下EasyScheduler中Scheduler线程分布式锁实现流程图:

三、线程不足循环等待问题

如果一个DAG中没有子流程,则如果Command中的数据条数大于线程池设置的阈值,则直接流程等待或失败。如果一个大的DAG中嵌套了很多子流程,如下图则会产生“死等”状态:

上图中MainFlowThread等待SubFlowThread1结束,SubFlowThread1等待SubFlowThread2结束, SubFlowThread2等待SubFlowThread3结束,而SubFlowThread3等待线程池有新线程,则整个DAG流程不能结束,从而其中的线程也不能释放。这样就形成的子父流程循环等待的状态。此时除非启动新的Master来增加线程来打破这样的”僵局”,否则调度集群将不能再使用。

对于启动新Master来打破僵局,似乎有点差强人意,于是我们提出了以下三种方案来降低这种风险:

计算所有Master的线程总和,然后对每一个DAG需要计算其需要的线程数,也就是在DAG流程执行之前做预计算。因为是多Master线程池,所以总线程数不太可能实时获取。对单Master线程池进行判断,如果线程池已经满了,则让线程直接失败。增加一种资源不足的Command类型,如果线程池不足,则将主流程挂起。这样线程池就有了新的线程,可以让资源不足挂起的流程重新唤醒执行。

注意:Master Scheduler线程在获取Command的时候是FIFO的方式执行的。

于是我们选择了第三种方式来解决线程不足的问题。

四、容错设计

容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况

1. 宕机容错

服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:

其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。

Master容错流程图:

ZooKeeper Master容错完成之后则重新由EasyScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。

Worker容错流程图:

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。

注意:由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。

2.任务失败重试

这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:

任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行从当前节点开始执行流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行

接下来说正题,我们将工作流中的任务节点分了两种类型。

一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、Spark节点、依赖节点等。

还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。

每一个业务节点都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试。但是逻辑节点里的任务支持重试。

如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作

五、任务优先级设计

在早期调度设计中,如果没有优先级设计,采用公平调度设计的话,会遇到先行提交的任务可能会和后继提交的任务同时完成的情况,而不能做到设置流程或者任务的优先级,因此我们对此进行了重新设计,目前我们设计如下:

按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流程内任务提交顺序依次从高到低进行任务处理。

具体实现是根据任务实例的json解析优先级,然后把流程实例优先级_流程实例id_任务优先级_任务id信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务

其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图

任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。

六、Logback和gRPC实现日志访问

由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。有两种方案:

将日志放到ES搜索引擎上

通过gRPC通信获取远程日志信息

介于考虑到尽可能的EasyScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息。

我们使用自定义Logback的FileAppender和Filter功能,实现每个任务实例生成一个日志文件。FileAppender主要实现如下:

/*** task log appender*/public class TaskLogAppender extends FileAppender<ILoggingEvent> {...@Overrideprotected void append(ILoggingEvent event) {if (currentlyActiveFile == null){currentlyActiveFile = getFile();}String activeFile = currentlyActiveFile;// thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceIdString threadName = event.getThreadName();String[] threadNameArr = threadName.split("-");// logId = processDefineId_processInstanceId_taskInstanceIdString logId = threadNameArr[1];...super.subAppend(event);}}

以/流程定义id/流程实例id/任务实例id.log的形式生成日志

过滤匹配以TaskLogInfo开始的线程名称:

TaskLogFilter实现如下:

/*** task log filter*/public class TaskLogFilter extends Filter<ILoggingEvent> {@Overridepublic FilterReply decide(ILoggingEvent event) {if (event.getThreadName().startsWith("TaskLogInfo-")){return FilterReply.ACCEPT;}return FilterReply.DENY;}}

如果觉得《工作流调度系统Apache DolphinScheduler介绍和设计原理》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。