
Docker方式部署SeaTunnel-2.3.4集群
概述
docker 方式部署 seatunnel,其实就是使用了 openjdk 的镜像给 seatunnel 提供了一个运行环境而已,启动镜像的时候将 seatunnel 的目录映射进容器即可
有聪明的同学可能会问了,为什么不将 seatunnel 打包进镜像?
原因是后续我们还可能会不断二次开发适合我们业务的插件,如果每次小修小改都重新打一个 4G 左右的镜像,部署的时候就很麻烦,所以还是单个插件覆盖更新的方式更合适
seatunnel 的 zeta 引擎使用 hazelcast 来实现集群化管理,基于 hazelcast 去中心化的性质,seatunnel 的集群模式可以只部署一个节点,也可以多个节点
前提
1、下载安装包:https://www.apache.org/dyn/closer.lua/seatunnel/2.3.4/apache-seatunnel-2.3.4-bin.tar.gz
2、安装好需要的 connector,安装包解压后的 bin 目录下有 install-plugin.sh(Linux/Mac) 和 install-plugin.cmd(Windows)
# 安装 connector
# linux
./bin/install-plugin.sh 2.3.4
# window
.\bin\install-plugin.cmd 2.3.4
如果觉得下载太慢,可以只安装自己需要的,打开 ./connectors/plugin-mapping.properties,将不需要的插件注释掉即可
或者在当前系统登录账号的家目录下的 .m2 里,找到 seatunnel 使用的 maven,在它的 conf/setting.xml 里加入阿里源
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
3、如果你用到的 connector 是需要其他依赖包的(seatunnel 官网的每个 connector 的介绍里都有些),记得要将依赖包放到指定目录
4、如果你本地就有 jdk 环境,那么可以试试是否可以运行任务
# linux
./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local
# window
.\bin\seatunnel.cmd --config .\config\v2.batch.config.template -e local
这个示例需要用到 fake、console 插件,如果 ./connectors/plugin-mapping.properties 里没有这两个插件,那就先加上,然后再跑一遍 ./bin/install-plugin.sh 2.3.4
seatunnel.source.FakeSource = connector-fake
seatunnel.sink.Console = connector-console
一、方式一:单个服务器部署多个节点(伪集群)
这里部署 3 个节点(当然,你也可以部署一个节点或两个节点)
部署目录结构
假设部署在 /home/server/seatunnel-cluster-2.3.4
/home/server/seatunnel-cluster-2.3.4
|-apache-seatunnel-2.3.4/ # seatunnel-2.3.4
|-data-node01/ # 存任务的持久化数据
|-data-node02/
|-data-node03/
|-logs-node01/ # 存日志
|-logs-node02/
|-logs-node03/
|-docker-compose.yml
编写 docker-compose.yml
version: "3.1"
services:
seatunnel-node01:
image: openjdk:8-jre-slim
container_name: seatunnel-node01
ports:
- "15801:5801"
volumes:
- "./apache-seatunnel-2.3.4:/seatunnel"
- "./data-node01:/seatunnel-data"
- "./logs-node01:/tmp/seatunnel/logs"
environment:
- "TZ=Asia/Shanghai"
entrypoint: ["/bin/bash", "-c", "/seatunnel/bin/seatunnel-cluster.sh"]
seatunnel-node02:
image: openjdk:8-jre-slim
container_name: seatunnel-node02
ports:
- "25801:5801"
volumes:
- "./apache-seatunnel-2.3.4:/seatunnel"
- "./data-node02:/seatunnel-data"
- "./logs-node02:/tmp/seatunnel/logs"
environment:
- "TZ=Asia/Shanghai"
entrypoint: ["/bin/bash", "-c", "/seatunnel/bin/seatunnel-cluster.sh"]
seatunnel-node03:
image: openjdk:8-jre-slim
container_name: seatunnel-node03
ports:
- "35801:5801"
volumes:
- "./apache-seatunnel-2.3.4:/seatunnel"
- "./data-node03:/seatunnel-data"
- "./logs-node03:/tmp/seatunnel/logs"
environment:
- "TZ=Asia/Shanghai"
entrypoint: ["/bin/bash", "-c", "/seatunnel/bin/seatunnel-cluster.sh"]
修改启动脚本
修改 apache-seatunnel-2.3.4/bin/seatunnel-cluster.sh
找到这一段(大概在 85 行)
# Log4j2 Config
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-engine-server"
fi
将这一行注释掉 JAVA_OPTS="{APP_DIR}/logs -Dseatunnel.logs.path=${APP_DIR}/logs"
目的是让 seatunnel 的日志写到 /tmp/seatunnel/logs 目录,而容器内的这个目录在我们的 docker-compose.yml 里映射到了宿主机
最终是
# Log4j2 Config
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
# 这一行注释掉
# JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-engine-server"
fi
修改 hazelcast 集群配置文件
这个步骤需要知道宿主机的 ip,这里假设宿主机的 ip 是 192.168.10.100
修改 apache-seatunnel-2.3.4/config/hazelcast.yaml
hazelcast:
cluster-name: seatunnel
network:
rest-api:
enabled: true
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
tcp-ip:
enabled: true
# 将三个节点的 ip:port 都写进去,ip要写宿主机的ip,端口就是docker-compose.yml映射的端口
member-list:
- 192.168.10.100:15801
- 192.168.10.100:25801
- 192.168.10.100:35801
port:
auto-increment: false
# 节点自己监听的端口
port: 5801
properties:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
hazelcast.operation.generic.thread.count: 50
# 配置任务信息持久化
map:
engine*:
map-store:
enabled: true
initial-mode: EAGER
factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
properties:
type: hdfs
namespace: /seatunnel-data/imap
clusterName: seatunnel-cluster
storage.type: hdfs
fs.defaultFS: file:///
二、方式二:每个服务器部署一个节点
和方式一 一样,区别是,docker-compose.yml 里只写一个节点即可,hazelcast.yaml 配置文件记得写上几个服务器的 ip:port
假设部署两个节点,分别在 192.168.10.100、192.168.10.200
部署目录结构
假设部署在两个服务器的 /home/server/seatunnel-cluster-2.3.4
/home/server/seatunnel-cluster-2.3.4
|-apache-seatunnel-2.3.4/ # seatunnel-2.3.4
|-data/ # 存任务的持久化数据
|-logs/ # 存日志
|-docker-compose.yml
编写 docker-compose.yml
version: "3.1"
services:
seatunnel-node:
image: openjdk:8-jre-slim
container_name: seatunnel-node
ports:
- "15801:5801"
volumes:
- "./apache-seatunnel-2.3.4:/seatunnel"
- "./data:/seatunnel-data"
- "./logs:/tmp/seatunnel/logs"
environment:
- "TZ=Asia/Shanghai"
entrypoint: ["/bin/bash", "-c", "/seatunnel/bin/seatunnel-cluster.sh"]
修改启动脚本
和上面单机部署多个节点一样
修改 apache-seatunnel-2.3.4/bin/seatunnel-cluster.sh
找到这一段(大概在 85 行)
# Log4j2 Config
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-engine-server"
fi
将这一行注释掉 JAVA_OPTS="{APP_DIR}/logs -Dseatunnel.logs.path=${APP_DIR}/logs"
目的是让 seatunnel 的日志写到 /tmp/seatunnel/logs 目录,而容器内的这个目录在我们的 docker-compose.yml 里映射到了宿主机
最终是
# Log4j2 Config
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
# 这一行注释掉
# JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-engine-server"
fi
修改 hazelcast 集群配置文件
修改 apache-seatunnel-2.3.4/config/hazelcast.yaml
hazelcast:
cluster-name: seatunnel
network:
rest-api:
enabled: true
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
tcp-ip:
enabled: true
# 将两个节点的 ip:port 都写进去,ip要写宿主机的ip,端口就是docker-compose.yml映射的端口
member-list:
- 192.168.10.100:15801
- 192.168.10.200:15801
port:
auto-increment: false
# 节点自己监听的端口
port: 5801
properties:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
hazelcast.operation.generic.thread.count: 50
# 配置任务信息持久化
map:
engine*:
map-store:
enabled: true
initial-mode: EAGER
factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
properties:
type: hdfs
namespace: /seatunnel-data/imap
clusterName: seatunnel-cluster
storage.type: hdfs
fs.defaultFS: file:///
三、测试客户端提交任务
使用 rest-api 提交任务
使用 postman 或者 curl 工具用 POST 方式请求任意一个节点的 /hazelcast/rest/maps/submit-job
例如:http://192.168.10.100:15801/hazelcast/rest/maps/submit-job?jobName=xxxx
在请求的 body 中填入
{
"env" : {
"parallelism" : 2,
"job.mode" : "BATCH",
"checkpoint.interval" : 10000
},
"source" : [
{
"schema" : {
"fields" : {
"name" : "string",
"age" : "int"
}
},
"row.num" : 16,
"parallelism" : 2,
"result_table_name" : "fake",
"plugin_name" : "FakeSource"
}
],
"sink" : [
{
"plugin_name" : "Console"
}
]
}
如果是 curl,命令如下
curl --location --request POST 'http://192.168.10.100:15801/hazelcast/rest/maps/submit-job?jobName=testJob \
--header 'Content-Type: application/json' \
--data-raw '{
"env" : {
"parallelism" : 2,
"job.mode" : "BATCH",
"checkpoint.interval" : 10000
},
"source" : [
{
"schema" : {
"fields" : {
"name" : "string",
"age" : "int"
}
},
"row.num" : 16,
"parallelism" : 2,
"result_table_name" : "fake",
"plugin_name" : "FakeSource"
}
],
"sink" : [
{
"plugin_name" : "Console"
}
]
}'
如果创建任务成功,则会返回 jobId 和 jobName
使用 rest-api 查看任务运行状态
请求路径 [GET 方式] /hazelcast/rest/maps/running-job/:jobId
假设提交任务时返回的 jobId 是 987654321,则
http://192.168.10.100:15801/hazelcast/rest/maps/running-job/987654321
curl 命令
curl --location --request GET 'http://192.168.10.100:15801/hazelcast/rest/maps/running-job/987654321'
如果任务还在运行,则会返回任务的信息,如果任务已经结束,则只会返回一个 {}
使用 rest-api 查看已经结束的任务列表
请求路径 [GET 方式] /hazelcast/rest/maps/finished-jobs/:state
state 参数可以不填,或者填:FINISHED,CANCELED,FAILED,UNKNOWABLE
假设只看成功结束的,curl 命令
curl --location --request GET 'http://192.168.10.100:15801/hazelcast/rest/maps/finished-jobs/FINISHED'
其他 api
其他 api 请看 seatunnel 官网 rest-api | Apache SeaTunnel