云计算、AI、云原生、大数据等一站式技术学习平台

网站首页 > 教程文章 正文

seatunnel 安装体验-基于 docker 提供 flink 环境

jxf315 2025-04-24 01:56:43 教程文章 19 ℃

seatunnel 原名 waterdrop,是一个非常易用高性能、支持实时流式离线批处理海量数据处理产品,架构于Apache SparkApache Flink之上。

本文介绍使用 docker 为 flink 创建 standalone 集群,运行 seatunnel 快速开始任务。

本地机器为 mac,flink 运行在 docker 容器中,部分步骤与效果和官网并不一致。

搭建 flink 集群

docker 的优势在于迁移方便,当创建好 flink 镜像后,编写 docker-compose 配置文件,即可随时随地起一个 flink 的本地集群。

seatunnel 目前版本支持的 flink 引擎版本是 1.9.0。

同时 seatunnel 对项目中 flink 的依赖声明为 provided,flink 版本的向后兼容是存在问题的。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

对于较新版本的 flink 缺少依赖支持,无法运行,所以创建的 flink 的集群同样采用 1.9.0 版本。

version: "3.1"

services:

  jobmanager:
    image: flink:1.9.0-scala_2.11
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: jobmanager
    ports:
      - 8081:8081
    command: jobmanager
    volumes:
      - jobmanager:/flink/jobmanager
    networks:
      - flink
  taskmanager:
    image: flink:1.9.0-scala_2.11
    depends_on:
      - jobmanager
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 8
    command: taskmanager
    scale: 1
    volumes:
      - taskmanager:/flink/taskmanager
    networks:
      - flink

networks:
  flink:
    driver: bridge

volumes:
  jobmanager:
  taskmanager:

将上面配置保存为 docker-compose.yml 文件,运行 docker-compose up -d 即可启动 standalone 集群。

打包或下载 seatunnel

可以在 github 下载 seatunnel 的 2.x 版本,或者 clone 源码自行打包。

本文 clone 源码,基于 dev 分支自行打包。

在项目源码目录下运行 mvn clean package 打包项目,打包文件位于 home/seatunnel-dist/target 目录下。

解压安装包:

tar -zxf seatunnel-dist-2.0.4-2.11.8-bin.tar.gz

配置 seatunnel

提交 seatunnel 需要依赖本地 flink 安装路径,需在 seatunnel-env.sh 文件中编辑配置 FLINK_HOME 为 flink 的安装路径。

为了能够提交任务到 flink 集群中,依然从 flink 官网下载了 1.9.0 版本的项目,进行解压。

创建任务配置文件

config 目录下,创建 application.conf 文件,内容如下:

env {
  execution.parallelism = 1
}

source {
  SocketStream{
    result_table_name = "fake"
    field_name = "info"
    host = xxx.xxx.xxx.xxx
    port = 19999
  }
}

transform {
  Split{
    separator = "#"
    fields = ["name","age"]
  }
  sql {
    sql = "select * from (select info,split(info) as info_row from fake) t1"
  }
}

sink {
  ConsoleSink {}
}

因为 flink 集群运行在 docker 中,seatunnel 提供的 SourceStream 的 host 配置默认为 localhost,指向 docker 内 taskmanager 所在容器的本地地址,如果不修改的话需要登陆容器启动 nc 服务,这里将其修改为自己本地机器的 ip 地址。

port 配置默认为 9999,这里改为 19999。

启动 nc

nc -l 19999

启动 seatunnel

在 seatunnel 解压目录运行命令,启动任务

./bin/start-seatunnel-flink.sh  --config ./config/application.conf

等待片刻任务启动后,即可在flink web-ui 中看到任务:

测试

在 nc 中输入 xg#1995

任务中配置基于 # 的字符串分割为 nameage 字段。

在 taskmanager 所在容器的 std 输出中即可看到 xg#1995,xg,1995 输出。

docker 启动的 flink 集群并不能很好地采集日志和 std 输出,因此在 flink 的 web-ui 中 Logs 和 Stdout 是没有任何输出的,查看任务的输出需要借助 docker-compose 命令:

docker-compose logs -f taskmanager

Tags:

最近发表
标签列表