概述

docker 方式部署 seatunnel,其实就是使用了 openjdk 的镜像给 seatunnel 提供了一个运行环境而已,启动镜像的时候将 seatunnel 的目录映射进容器即可

有聪明的同学可能会问了,为什么不将 seatunnel 打包进镜像?

原因是后续我们还可能会不断二次开发适合我们业务的插件,如果每次小修小改都重新打一个 4G 左右的镜像,部署的时候就很麻烦,所以还是单个插件覆盖更新的方式更合适

seatunnel 的 zeta 引擎使用 hazelcast 来实现集群化管理,基于 hazelcast 去中心化的性质,seatunnel 的集群模式可以只部署一个节点,也可以多个节点

前提

1、下载安装包:https://www.apache.org/dyn/closer.lua/seatunnel/2.3.4/apache-seatunnel-2.3.4-bin.tar.gz

2、安装好需要的 connector,安装包解压后的 bin 目录下有 install-plugin.sh(Linux/Mac) 和 install-plugin.cmd(Windows)

# 安装 connector
# linux
./bin/install-plugin.sh 2.3.4

# window
.\bin\install-plugin.cmd 2.3.4

如果觉得下载太慢,可以只安装自己需要的,打开 ./connectors/plugin-mapping.properties,将不需要的插件注释掉即可

或者在当前系统登录账号的家目录下的 .m2 里,找到 seatunnel 使用的 maven,在它的 conf/setting.xml 里加入阿里源

<mirror>
  <id>aliyunmaven</id>
  <mirrorOf>*</mirrorOf>
  <name>阿里云公共仓库</name>
  <url>https://maven.aliyun.com/repository/public</url>
</mirror>

3、如果你用到的 connector 是需要其他依赖包的(seatunnel 官网的每个 connector 的介绍里都有些),记得要将依赖包放到指定目录

4、如果你本地就有 jdk 环境,那么可以试试是否可以运行任务

# linux
./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local

# window
.\bin\seatunnel.cmd --config .\config\v2.batch.config.template -e local 

这个示例需要用到 fake、console 插件,如果 ./connectors/plugin-mapping.properties 里没有这两个插件,那就先加上,然后再跑一遍 ./bin/install-plugin.sh 2.3.4​

seatunnel.source.FakeSource = connector-fake
seatunnel.sink.Console = connector-console

一、方式一:单个服务器部署多个节点(伪集群)

这里部署 3 个节点(当然,你也可以部署一个节点或两个节点)

部署目录结构

假设部署在 /home/server/seatunnel-cluster-2.3.4

/home/server/seatunnel-cluster-2.3.4
  |-apache-seatunnel-2.3.4/      # seatunnel-2.3.4
  |-data-node01/                 # 存任务的持久化数据
  |-data-node02/
  |-data-node03/
  |-logs-node01/                 # 存日志
  |-logs-node02/
  |-logs-node03/
  |-docker-compose.yml

编写 docker-compose.yml

version: "3.1"
services:
    seatunnel-node01:
        image: openjdk:8-jre-slim
        container_name: seatunnel-node01
        ports:
          - "15801:5801"
        volumes:
          - "./apache-seatunnel-2.3.4:/seatunnel"
          - "./data-node01:/seatunnel-data"
          - "./logs-node01:/tmp/seatunnel/logs"
        environment:
          - "TZ=Asia/Shanghai"
        entrypoint: ["/bin/bash", "-c", "/seatunnel/bin/seatunnel-cluster.sh"]
    
    seatunnel-node02:
        image: openjdk:8-jre-slim
        container_name: seatunnel-node02
        ports:
          - "25801:5801"
        volumes:
          - "./apache-seatunnel-2.3.4:/seatunnel"
          - "./data-node02:/seatunnel-data"
          - "./logs-node02:/tmp/seatunnel/logs"
        environment:
          - "TZ=Asia/Shanghai"
        entrypoint: ["/bin/bash", "-c", "/seatunnel/bin/seatunnel-cluster.sh"]
  
    seatunnel-node03:
        image: openjdk:8-jre-slim
        container_name: seatunnel-node03
        ports:
          - "35801:5801"
        volumes:
          - "./apache-seatunnel-2.3.4:/seatunnel"
          - "./data-node03:/seatunnel-data"
          - "./logs-node03:/tmp/seatunnel/logs"
        environment:
          - "TZ=Asia/Shanghai"
        entrypoint: ["/bin/bash", "-c", "/seatunnel/bin/seatunnel-cluster.sh"]

修改启动脚本

修改 apache-seatunnel-2.3.4/bin/seatunnel-cluster.sh

找到这一段(大概在 85 行)

# Log4j2 Config
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
  JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-engine-server"
fi

将这一行注释掉 JAVA_OPTS="{APP_DIR}/logs -Dseatunnel.logs.path=${APP_DIR}/logs"​

目的是让 seatunnel 的日志写到 /tmp/seatunnel/logs 目录,而容器内的这个目录在我们的 docker-compose.yml 里映射到了宿主机

最终是

# Log4j2 Config
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
  JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
# 这一行注释掉
#  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-engine-server"
fi

修改 hazelcast 集群配置文件

这个步骤需要知道宿主机的 ip,这里假设宿主机的 ip 是 192.168.10.100

修改 apache-seatunnel-2.3.4/config/hazelcast.yaml

hazelcast:
  cluster-name: seatunnel
  network:
    rest-api:
      enabled: true
      endpoint-groups:
        CLUSTER_WRITE:
          enabled: true
        DATA:
          enabled: true
    join:
      tcp-ip:
        enabled: true
        # 将三个节点的 ip:port 都写进去,ip要写宿主机的ip,端口就是docker-compose.yml映射的端口
        member-list:
          - 192.168.10.100:15801
          - 192.168.10.100:25801
          - 192.168.10.100:35801
    port:
      auto-increment: false
      # 节点自己监听的端口
      port: 5801
  properties:
    hazelcast.invocation.max.retry.count: 20
    hazelcast.tcp.join.port.try.count: 30
    hazelcast.logging.type: log4j2
    hazelcast.operation.generic.thread.count: 50
  # 配置任务信息持久化
  map:
    engine*:
      map-store:
        enabled: true
        initial-mode: EAGER
        factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
        properties:
          type: hdfs
          namespace: /seatunnel-data/imap
          clusterName: seatunnel-cluster
          storage.type: hdfs
          fs.defaultFS: file:///

二、方式二:每个服务器部署一个节点

和方式一 一样,区别是,docker-compose.yml 里只写一个节点即可,hazelcast.yaml 配置文件记得写上几个服务器的 ip:port

假设部署两个节点,分别在 192.168.10.100、192.168.10.200

部署目录结构

假设部署在两个服务器的 /home/server/seatunnel-cluster-2.3.4

/home/server/seatunnel-cluster-2.3.4
  |-apache-seatunnel-2.3.4/      # seatunnel-2.3.4
  |-data/                        # 存任务的持久化数据
  |-logs/                        # 存日志
  |-docker-compose.yml

编写 docker-compose.yml

version: "3.1"
services:
    seatunnel-node:
        image: openjdk:8-jre-slim
        container_name: seatunnel-node
        ports:
          - "15801:5801"
        volumes:
          - "./apache-seatunnel-2.3.4:/seatunnel"
          - "./data:/seatunnel-data"
          - "./logs:/tmp/seatunnel/logs"
        environment:
          - "TZ=Asia/Shanghai"
        entrypoint: ["/bin/bash", "-c", "/seatunnel/bin/seatunnel-cluster.sh"]

修改启动脚本

和上面单机部署多个节点一样

修改 apache-seatunnel-2.3.4/bin/seatunnel-cluster.sh

找到这一段(大概在 85 行)

# Log4j2 Config
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
  JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-engine-server"
fi

将这一行注释掉 JAVA_OPTS="{APP_DIR}/logs -Dseatunnel.logs.path=${APP_DIR}/logs"​

目的是让 seatunnel 的日志写到 /tmp/seatunnel/logs 目录,而容器内的这个目录在我们的 docker-compose.yml 里映射到了宿主机

最终是

# Log4j2 Config
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
  JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
# 这一行注释掉
#  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
  JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-engine-server"
fi

修改 hazelcast 集群配置文件

修改 apache-seatunnel-2.3.4/config/hazelcast.yaml

hazelcast:
  cluster-name: seatunnel
  network:
    rest-api:
      enabled: true
      endpoint-groups:
        CLUSTER_WRITE:
          enabled: true
        DATA:
          enabled: true
    join:
      tcp-ip:
        enabled: true
        # 将两个节点的 ip:port 都写进去,ip要写宿主机的ip,端口就是docker-compose.yml映射的端口
        member-list:
          - 192.168.10.100:15801
          - 192.168.10.200:15801
    port:
      auto-increment: false
      # 节点自己监听的端口
      port: 5801
  properties:
    hazelcast.invocation.max.retry.count: 20
    hazelcast.tcp.join.port.try.count: 30
    hazelcast.logging.type: log4j2
    hazelcast.operation.generic.thread.count: 50
  # 配置任务信息持久化
  map:
    engine*:
      map-store:
        enabled: true
        initial-mode: EAGER
        factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
        properties:
          type: hdfs
          namespace: /seatunnel-data/imap
          clusterName: seatunnel-cluster
          storage.type: hdfs
          fs.defaultFS: file:///

三、测试客户端提交任务

使用 rest-api 提交任务

使用 postman 或者 curl 工具用 POST 方式请求任意一个节点的 /hazelcast/rest/maps/submit-job

例如:http://192.168.10.100:15801/hazelcast/rest/maps/submit-job?jobName=xxxx

在请求的 body 中填入

{
    "env" : {
        "parallelism" : 2,
        "job.mode" : "BATCH",
        "checkpoint.interval" : 10000
    },
    "source" : [
        {
            "schema" : {
                "fields" : {
                    "name" : "string",
                    "age" : "int"
                }
            },
            "row.num" : 16,
            "parallelism" : 2,
            "result_table_name" : "fake",
            "plugin_name" : "FakeSource"
        }
    ],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}

如果是 curl,命令如下

curl --location --request POST 'http://192.168.10.100:15801/hazelcast/rest/maps/submit-job?jobName=testJob \
--header 'Content-Type: application/json' \
--data-raw '{
    "env" : {
        "parallelism" : 2,
        "job.mode" : "BATCH",
        "checkpoint.interval" : 10000
    },
    "source" : [
        {
            "schema" : {
                "fields" : {
                    "name" : "string",
                    "age" : "int"
                }
            },
            "row.num" : 16,
            "parallelism" : 2,
            "result_table_name" : "fake",
            "plugin_name" : "FakeSource"
        }
    ],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}'

如果创建任务成功,则会返回 jobId 和 jobName

使用 rest-api 查看任务运行状态

请求路径 [GET 方式] /hazelcast/rest/maps/running-job/:jobId

假设提交任务时返回的 jobId 是 987654321,则

http://192.168.10.100:15801/hazelcast/rest/maps/running-job/987654321

curl 命令

curl --location --request GET 'http://192.168.10.100:15801/hazelcast/rest/maps/running-job/987654321'

如果任务还在运行,则会返回任务的信息,如果任务已经结束,则只会返回一个 {}

使用 rest-api 查看已经结束的任务列表

请求路径 [GET 方式] /hazelcast/rest/maps/finished-jobs/:state

state 参数可以不填,或者填:FINISHED​,CANCELED​,FAILED​,UNKNOWABLE​

假设只看成功结束的,curl 命令

curl --location --request GET 'http://192.168.10.100:15801/hazelcast/rest/maps/finished-jobs/FINISHED'

其他 api

其他 api 请看 seatunnel 官网 rest-api | Apache SeaTunnel