概述

本次源码解读基于 v2.3.4 版本

v2.3.4 官方文档:About SeaTunnel Engine | Apache SeaTunnel

SeaTunnel 支持 Flink、Spark 引擎启动,也支持自身的 Zeta 引擎启动。

自身的 Zeta 引擎基于 hazelcast 作为分布式集群控制,支持单机、集群运行,支持自治集群(去中心化),省去了用户为 SeaTunnel Engine 集群指定 Master 节点的麻烦,因为它在运行过程中可以自行选择一个 Master 节点,当 Master 节点发生故障时,会自动选择新的 Master 节点。作为 SeaTunnel 的默认引擎,它支持高吞吐、低延迟、强一致性的同步作业操作,速度更快、更稳定、更节省资源、使用方便。

这篇文章我们先大致了解下 SeaTunnel 的启动流程中都干了些什么,后面的文章中我们在慢慢一步一步深入了解

题外话

hazelcast 是一个开源的分布式内存级别的缓存数据库,当然,也可以使用它的各种 API 来实现自己的分布式集群效果,它不需要依赖 Zookeeper 这类额外的中间件来协助 Master 节点选举,使用起来也简单,SeaTunnel 基于它封装了自己的 Zeta 引擎,轻量而且宕机恢复能力也很强,启动速度也极快。(有兴趣的同学可以认真研究下 SeaTunnel 的 Zeta 引擎,参考 Zeta 引擎集成 hazelcast 的方法,做一些自己的分布式集群的小项目,hazelcast 是真的小而强悍!)

废话不多说,开始吧!

启动流程分析

启动脚本找启动类

首先,SeaTunnel 为我们提供了集群启动时使用的启动脚本 bin/seatunnel-cluster.sh,打开这个脚本,大概在 40 行左右,可以看到 APP_MAIN 这个变量,定义了 jvm 启动的入口类是 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer​

APP_JAR=${APP_DIR}/starter/seatunnel-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer"
OUT="${APP_DIR}/logs/seatunnel-server.out"

从启动类出发

打开 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer​,里面代码很简单,也就解析命令行传入的参数,构建 Server 实例并启动

// org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer

public class SeaTunnelServer {
    public static void main(String[] args) throws CommandException {
        ServerCommandArgs serverCommandArgs =
                CommandLineUtils.parse(
                        args,
                        new ServerCommandArgs(),
                        EngineType.SEATUNNEL.getStarterShellName(),
                        true);
        SeaTunnel.run(serverCommandArgs.buildCommand());
    }
}

​serverCommandArgs.buildCommand()​ ​返回的是 ServerExecuteCommand​ ​类的实例,SeaTunnel.run​ ​执行的是 ServerExecuteCommand​ ​类实例的 execute​ ​方法

所有我们接下来进入 ServerExecuteCommand​ ​的 execute​ ​方法

// org.apache.seatunnel.core.starter.seatunnel.command.ServerExecuteCommand

    @Override
    public void execute() {
        // 读取配置文件
        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        if (StringUtils.isNotEmpty(serverCommandArgs.getClusterName())) {
            seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
        }
        // 创建Hazelcast实例并启动
        HazelcastInstanceFactory.newHazelcastInstance(
                seaTunnelConfig.getHazelcastConfig(),
                Thread.currentThread().getName(),
                new SeaTunnelNodeContext(seaTunnelConfig));
    }

看着也不复杂,读取配置文件,创建 Hazelcast 实例并启动

其实关键就是在给 HazelcastInstanceFactory.newHazelcastInstance​ 传入的最后一个参数 new SeaTunnelNodeContext(seaTunnelConfig)​

​HazelcastInstanceFactory.newHazelcastInstance​ ​接收的最后一个参数是 NodeContext,是 Hazelcast 节点整个生命周期都存在的节点上下文,SeaTunnel 就是在 SeaTunnelNodeContext​ ​这个类上启动了 SeaTunnel 的服务端

接着,我们看看 SeaTunnelNodeContext​

// org.apache.seatunnel.engine.server.SeaTunnelNodeContext

public class SeaTunnelNodeContext extends DefaultNodeContext {

    private final SeaTunnelConfig seaTunnelConfig;

    public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
        this.seaTunnelConfig = seaTunnelConfig;
    }

    // 这个方法会被Hazelcast内部逻辑调用
    @Override
    public NodeExtension createNodeExtension(@NonNull Node node) {
        return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
    }
}

在 createNodeExtension​ 方法中,创建了 NodeExtension​ 实例,而在 NodeExtension​ 的构造方法中,创建了 SeaTunnelServer​,这个类就是 SeaTunnel 的真正服务端类

然后在 NodeExtension​ ​的 createExtensionServices​ ​方法中,将 SeaTunnelServer​ ​注册到了 hazelcast

关键的 SeaTunnelServer

​org.apache.seatunnel.engine.server.SeaTunnelServer​ ​实现了 ManagedService, MembershipAwareService, LiveOperationsTracker​,这三个接口是​Hazelcast​节点生命周期相关接口,SeaTunnel 也是利用 Hazelcast​生命周期相关的钩子来管理自己的核心功能

咱们来看看 SeaTunnelServer​ ​的字段

// org.apache.seatunnel.engine.server.SeaTunnelServer

public class SeaTunnelServer
        implements ManagedService, MembershipAwareService, LiveOperationsTracker {

    private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);

    public static final String SERVICE_NAME = "st:impl:seaTunnelServer";

    private NodeEngineImpl nodeEngine;
    private final LiveOperationRegistry liveOperationRegistry;

    private volatile SlotService slotService;
    private TaskExecutionService taskExecutionService;
    private CoordinatorService coordinatorService;
    private ScheduledExecutorService monitorService;

    @Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;

    private final SeaTunnelConfig seaTunnelConfig;

    private volatile boolean isRunning = true;
}

其中最重要的 4 个组件

  • slotService:槽管理服务,控制任务运行数量,任务 cpu、内存资源分配

  • taskExecutionService:处理客户端提交的任务

  • coordinatorService:集群状态监听

  • seaTunnelHealthMonitor:集群健康状态监测

接着看 SeaTunnelServer​ 的 init​ 方法,上面提到的 4 个组件都在这里初始化了

// org.apache.seatunnel.engine.server.SeaTunnelServer

    @Override
    public void init(NodeEngine engine, Properties hzProperties) {
        this.nodeEngine = (NodeEngineImpl) engine;
        // TODO Determine whether to execute there method on the master node according to the deploy
        // type
        taskExecutionService = new TaskExecutionService(nodeEngine, nodeEngine.getProperties());
        nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
        taskExecutionService.start();
    
        // 初始化slotService,这个方法里面用了单例模式的DCL双重锁检查,因为getSlotService方法很多地方都有调用
        getSlotService();

        // 初始化集群信息监控
        coordinatorService =
                new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());

        // 启动定时任务,定时触发coordinatorService轮询集群的信息并打印出来
        monitorService = Executors.newSingleThreadScheduledExecutor();
        monitorService.scheduleAtFixedRate(
                this::printExecutionInfo,
                0,
                seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
                TimeUnit.SECONDS);
  
        // 初始化集群健康状态监测
        seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());

        // a trick way to fix StatisticsDataReferenceCleaner thread class loader leak.
        // see https://issues.apache.org/jira/browse/HADOOP-19049
        FileSystem.Statistics statistics = new FileSystem.Statistics("SeaTunnel");
    }

到这里,SeaTunnel 节点就完成了初始化了,里面涉及到的这几个组件,就不在这里展开跟踪了,后面单独写文章来详细了解

监听 rest-api 请求

官方文档有提到 SeaTunnel 支持 rest-api 方式提交任务,那么是在哪定义的 api 呢,api 的处理逻辑在哪呢?

咱们回到 org.apache.seatunnel.engine.server.NodeExtension​,这里面定义了 hazelcast​节点的生命周期中会触发到的钩子方法

其中 createTextCommandService​ ​方法中,就注册了 http 请求的处理器

// org.apache.seatunnel.engine.server.NodeExtension

    @Override
    public TextCommandService createTextCommandService() {
        return new TextCommandServiceImpl(node) {
            {
                // 获取日志配置信息
                register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this));
                // 设置日志配置信息
                register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this));
                // rest-api的get方法处理
                register(HTTP_GET, new RestHttpGetCommandProcessor(this));
                // rest-api的post方法处理
                register(HTTP_POST, new RestHttpPostCommandProcessor(this));
            }
        };
    }

总体流程

最后放一个简单的总体流程图

​​