云计算、AI、云原生、大数据等一站式技术学习平台

网站首页 > 教程文章 正文

画像笔记23- 作业流程调度(2)(作业流程图怎么画)

jxf315 2025-04-11 00:04:04 教程文章 29 ℃

6.脚本实例

在Airflow中,简单地说,task脚本是需要被一个个调起执行的脚本,DAG脚本是管理task脚本执行顺序、执行触发条件的。在Airflow 调度开发中主要需要维护的是DAG脚本。

下面通过一个具体的例子来了解。


注1:os.environ :os.environ['环境变量名称']='新环境变量值'

注2:sys.path 返回的是一个列表!

该路径已经添加到系统的环境变量了,当我们要添加自己的搜索目录时,可以通过列表的append()方法;

对于模块和自己写的脚本不在同一个目录下,在脚本开头加sys.path.append('xxx')


注3:os.path.join()函数用于路径拼接文件路径。

os.path.join()Python中的方法會智能地連接一個或多個路徑組件。

此方法將各個路徑組成部分與每個非空部分之後的最後一個路徑組成部分恰好用一個目錄分隔符(/)串聯在一起。如果要連接的最後一個路徑組件為空,則將目錄分隔符('/')放在末尾。

举例:

path = "/home"

# Join various path components

print(os.path.join(path, "User/Desktop", "file.txt"))

输出:

/home/User/Desktop/file.txt

在该脚本中,首先定义了需要引入的依赖包,定义了默认的参数配置及DAG参数和调度时间。其中default_args 的默认配置中主要定义了如下参数:

  • depends_on_past: 是否依赖上游任务,即上一个调度任务执行失败时,是否执行该任务。可选项包括True 和False ,False 表示当前执行脚本不依赖上有执行任务是否成功;
  • start_time:表示首次任务的执行日期;
  • email:设定当任务执行失败时,用于接受失败报警邮件的邮箱地址;
  • email_on_failure:当任务执行失败时,是否发送邮件。可选项包括True 和False,True 表示失败时将发送邮件;
  • retries:表示执行失败时是否重新调起任务执行,1表示会重新调起;
  • retry_delay:表示重新掉漆执行任务的时间间隔。

在DAG的定义中,除了引入上树的默认配置(default_args = default_args)外,还定义了该DAG脚本的dag_id 为userprofile_dag,定时调度为每天早上7点。中间两行为配置脚本的运行的环境变量。(上面阴影字体已经解释)


上面这段脚本中引入了需要执行的task_id,并对DAG进行了实例化。其中对userlabel_task1 这个task_id来说,里面的bash_command 参数对应具体执行这个task任务的脚本,可理解为Linux下提交的shell命令。userlabel_execute1.py文件为执行加工用户订单量对应的的脚本。Trigger_rule 参数为该task任务执行的触发条件,官方文档里面触发条件有5种状态,一般常用的为"All_done" 和 "ALL_SUCCESS"两种。其中ALL_DONE为当上一个task执行完成时,该task即可执行,而ALL_SUCCESS 为只有当上一个task执行成功时,该task才能调起执行,执行失败时,本task不执行任务。

"airflow_run >> userlabel_task1" 命令为task 脚本的调度顺序,在该命令中先执行"airflow_run"任务后执行"userlabel_task1"任务。配置完成后,可以在Airflow 的Web端管理界面的“Graph View" 选项下面看到上文配置的调度依赖流程图,如下图所示:


Airflow 下用户画像调度流图

常用命令行

Airflow 通过可视化界面的方式实现了调度管理的界面操作,但在测试脚本或界面操作失败的时候,可通过命令行的方式调起任务。下面介绍几个常用的命令:

  • airflow list_tasks userprofile: 该命令用于查看当前DAG 任务下的所有task 列表,其中userprofile是DAG名称。
  • airflow test userprofile age_task 20180701:该命令用于测试DAG下面某个task 是否能正常执行,其中userprofile是DAG名称,age_task是其中一个task的名称。
  • airflow backfill -s 2018-07-01 -e 2018-07-02 userprofile:该命令用于调起整个DAG脚本执行任务,其中userprofile是DAG名称,2018-07-01 是脚本执行的开始日期。

工程化调度方案

在工程实践中,对于用户画像每天的ETL调度工作,除了标签的调度,还包括同步数据到服务层,数据的监控预警(标签预警、同步到服务层的预警等)。

下面详细介绍工程化调度中覆盖的模块,通过该调度方案可以把前面介绍的标签开发、同步数据到服务层、服务层调用数据等开发内容的知识点全部串联起来,使读者对用户画像整体方案有一个宏观上的认识。

从下图可以看出,用户画像工程的调度主要可划分为2个模块,包括在数据仓库进行的标签计算,以及数据写入服务层,下面详细进行介绍。


1. 标签计算

标签计算主要用于每天通过ETL将标签打在用户身上,包括统计类标签、规则类标签、机器学习标签等。对应的ETL脚本执行作业过程中如果失败,Airflow支持失败后重试。

脚本示例如下


从上面的脚本中可以看到检查上游任务的task失败后会每隔5分钟重试一次(retry_delay),最多重试10*12 次(retries)。下面标签计算的task执行触发条件(trigger_rule) 是上游任务执行成功(ALL_SUCCESS)所以在上有任务执行失败重试时,标签计算的任务不会调起执行。bash_command是提交执行任务的命令,该命令中提交执行对应的任务脚本。

标签计算完成后,校验当天标签的产出是否正常,当校验通过后继续进行输出到服务层的任务,否则任务失败重试或任务挂起。

2. 数据写入服务层

在ETL任务执行到服务层时,将对应的标签数据写入服务层对应的数据库中。如对接本公司的营销平台,则将数据写入到Hbase、Elasticsearch等数据库,或对接

第三方营销平台,通过接口的方式将数据输出到第三方营销平台去。

3. 服务层调用

服务层通过接口方式调用符合业务需求的用户数据。下面举两个应用场景。

场景一:通过对存储在HBase中的用户数据在Elasticsearch中创建二级索引的方式,支持到组合标签筛选对应的用户,进而对其进行特定的营销动作。

场景二:通过传入用户id来查询该用户身上带有的标签信息,进而对其进行个性化的营销、服务等行为。

看到6.3 待续。

Tags:

最近发表
标签列表