概述

本次源码解读基于 v2.3.4 版本

v2.3.4 官方文档:About SeaTunnel Engine | Apache SeaTunnel

在上一节 SeaTunnel Zeta 引擎启动流程分析(一) ,我们跟踪到了 SeaTunnelServer 的初始化方法,里面创建了一大堆的 xxxService(slotService、taskExecutionService、seaTunnelHealthMonitor、taskExecutionService),每个 service 职责分明,共同形成了 SeaTunnel 强大的分布式集群运行任务功能,这里根据 service 的职责,由浅入深的方式慢慢理解每个 service 到底是如何运作的

  • slotService:槽管理服务,控制任务运行数量,任务 cpu、内存资源分配

  • seaTunnelHealthMonitor:集群健康状态监测

  • coordinatorService:集群状态监听

  • taskExecutionService:处理客户端提交的任务

本节咱们阅读一下槽服务(slotService)的工作流程,看看它是怎么做资源分配的,以后我们自己工作上或自己的小项目要是遇到需要给任务分配资源的场景,也可以参考它这种方式

概念回顾

引用一句官方的描述:插槽的数量决定了集群节点可以并行运行的任务组的数量。SeaTunnel 引擎是一个数据同步引擎,大多数作业都是 IO 密集型的。

我们打开 seatunnel 的配置文件 seatunnel.yaml,里面有这个一段配置专门配置槽服务的

seatunnel:
  engine:
    ...
    ...
    slot-service:
      # 是否动态分配槽
      dynamic-slot: true
    ...
    ...

如果 dynamic-slot = true,则表示 seatunnel 在启动时不固定多少个槽,而是运行过程中,根据任务需要资源来划分,来了一个任务说需要 1g 内存、1 核 cpu,只要当前资源还充裕,就给它分配一个槽去运行

如果 dynamic-slot = false,则表示 seatunnel 在启动时就固定了有多少个槽,运行过程中,槽用完了,就算资源还充裕,后面来的任务也得等待前面的任务释放槽才能运行

在官方的这个章节【Deploy SeaTunnel Engine Hybrid Mode Cluster】有说明槽服务应该怎么配置

官方描述:

插槽的数量决定了集群节点可以并行运行的任务组的数量。任务所需的槽数公式为 N = 2 + P(任务配置的并行度)。默认情况下,SeaTunnel 引擎中的插槽数量是动态的,即没有数量限制。我们建议将插槽数量设置为节点上 CPU 核心数量的两倍。(因为大多数任务都是 IO 密集型的)

如果想指定槽数量,则将 dynamic-slot 设置为 false,并增加一个配置项 slot-num 即可,如下:

seatunnel:
  engine:
    ...
    ...
    slot-service:
      dynamic-slot: false
      slot-num: 8
    ...
    ...

下面,我们就开始一步一步跟踪源码

源码跟踪

SeaTunnelServer 创建槽服务实例

我们回到 org.apache.seatunnel.engine.server.SeaTunnelServer 这个类的 init 方法,里面调用了 getSlotService() 方法来触发创建槽服务

// org.apache.seatunnel.engine.server.SeaTunnelServer#init

    @Override
    public void init(NodeEngine engine, Properties hzProperties) {
        this.nodeEngine = (NodeEngineImpl) engine;
        // TODO Determine whether to execute there method on the master node according to the deploy
        // type
        taskExecutionService = new TaskExecutionService(nodeEngine, nodeEngine.getProperties());
        nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
        taskExecutionService.start();

        // 这里触发创建槽服务实例
        getSlotService();
        coordinatorService =
                new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
        monitorService = Executors.newSingleThreadScheduledExecutor();
        monitorService.scheduleAtFixedRate(
                this::printExecutionInfo,
                0,
                seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
                TimeUnit.SECONDS);

        seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());

        // a trick way to fix StatisticsDataReferenceCleaner thread class loader leak.
        // see https://issues.apache.org/jira/browse/HADOOP-19049
        FileSystem.Statistics statistics = new FileSystem.Statistics("SeaTunnel");
    }

继续走到 getSlotService 方法,这个方法很简单,而且也是使用懒加载的方式创建的,第一次调用时才创建槽服务实例,并且后续避免重复创建

// org.apache.seatunnel.engine.server.SeaTunnelServer#getSlotService

    /** Lazy load for Slot Service */
    public SlotService getSlotService() {
        if (slotService == null) {
            synchronized (this) {
                if (slotService == null) {
                    SlotService service =
                            new DefaultSlotService(
                                    nodeEngine,
                                    taskExecutionService,
                                    seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
                    service.init();
                    slotService = service;
                }
            }
        }
        return slotService;
    }

DefaultSlotService 槽服务内部逻辑

ok,现在我们找到了槽服务对应的类 DefaultSlotService,点进去一看,其实槽服务的内容很少很简单,代码不到三百行,一共才这么几个方法

  • init:初始化槽服务的一些属性,比如创建记录分配了多少资源、剩余多少资源的对象,创建线程定时上传当前集群节点的槽资源分配情况上报给主节点

  • reset:清除原先的状态信息,重新初始化

  • requestSlot:提供给外部调用,给任务申请一个槽资源

  • releaseSlot:提供给外部调用,释放一个槽资源

  • getSlotContext:根据槽信息获取这个槽的上下文信息

  • close:关闭槽服务的,释放线程池

  • (private)selectBestMatchSlot:当有需要申请一个槽资源时,根据一定计算方法,计算出最合理的槽资源返回

  • (private)initFixedSlots:如果 SeaTunnel 的配置文件中,槽服务相关配置设置的是固定槽数量,则会调用这个方法来创建指定数量的槽,资源也会被平均分配给每个槽

  • getWorkerProfile:获取当前集群节点的槽资源分配情况

  • sendToMaster:将当前集群节点的槽资源分配情况发送给主节点

每个方法的实现逻辑都很简单,这里就不一一展开详细讲解了,打开代码一看就懂了

init() 初始化槽服务

// org.apache.seatunnel.engine.server.service.slot.DefaultSlotService#init

    public void init() {
        initStatus = true;
        // 当前槽服务编码,用来区分不同节点的槽服务
        slotServiceSequence = UUID.randomUUID().toString();
        // 槽id -> 槽上下文的映射
        contexts = new ConcurrentHashMap<>();
        // 已经分配出去的槽
        assignedSlots = new ConcurrentHashMap<>();
        // 还没分配出去的槽
        unassignedSlots = new ConcurrentHashMap<>();
        // 还没分配出去的资源(cpu、内存)
        unassignedResource = new AtomicReference<>(new ResourceProfile());
        // 已经分配出去的资源(cpu、内存)
        assignedResource = new AtomicReference<>(new ResourceProfile());
        scheduledExecutorService =
                Executors.newSingleThreadScheduledExecutor(
                        r ->
                                new Thread(
                                        r,
                                        String.format(
                                                "hz.%s.seaTunnel.slotService.thread",
                                                nodeEngine.getHazelcastInstance().getName())));
        // 如果配置使用固定槽数量,则创建指定数量槽
        if (!config.isDynamicSlot()) {
            initFixedSlots();
        }
        unassignedResource.set(getNodeResource());
        // 定时上报当前节点的槽分配信息到主节点
        scheduledExecutorService.scheduleAtFixedRate(
                () -> {
                    try {
                        LOGGER.fine(
                                "start send heartbeat to resource manager, this address: "
                                        + nodeEngine.getClusterService().getThisAddress());
                        // 上报信息
                        sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();
                    } catch (Exception e) {
                        LOGGER.warning(
                                "failed send heartbeat to resource manager, will retry later. this address: "
                                        + nodeEngine.getClusterService().getThisAddress());
                    }
                },
                0,
                DEFAULT_HEARTBEAT_TIMEOUT,
                TimeUnit.MILLISECONDS);
    }

initFixedSlots() 创建固定数量槽

// org.apache.seatunnel.engine.server.service.slot.DefaultSlotService#initFixedSlots

    private void initFixedSlots() {
        // 获取最大可用内存
        long maxMemory = Runtime.getRuntime().maxMemory();
        // 配置上填了多少个槽,就创建多少个槽,cpu不做限制,内存则每个槽都是平均值
        for (int i = 0; i < config.getSlotNum(); i++) {
            unassignedSlots.put(
                    i,
                    new SlotProfile(
                            nodeEngine.getThisAddress(),
                            i,
                            new ResourceProfile(
                                    CPU.of(0), Memory.of(maxMemory / config.getSlotNum())),
                            slotServiceSequence));
        }
    }

requestSlot() 申请槽资源

// org.apache.seatunnel.engine.server.service.slot.DefaultSlotService#requestSlot

    @Override
    public synchronized SlotAndWorkerProfile requestSlot(
            long jobId, ResourceProfile resourceProfile) {
        initStatus = false;
        // 根据调用方需要的资源多少,计算出合适的槽资源分配信息
        SlotProfile profile = selectBestMatchSlot(resourceProfile);
        if (profile != null) {
            // 记录下当前分配出去的槽、资源
            profile.assign(jobId);
            assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
            unassignedResource.accumulateAndGet(
                    profile.getResourceProfile(), ResourceProfile::subtract);
            unassignedSlots.remove(profile.getSlotID());
            assignedSlots.put(profile.getSlotID(), profile);
            contexts.computeIfAbsent(
                    profile.getSlotID(),
                    p -> new SlotContext(profile.getSlotID(), taskExecutionService));
        }
        LOGGER.fine(
                String.format(
                        "received slot request, jobID: %d, resource profile: %s, return: %s",
                        jobId, resourceProfile, profile));
        return new SlotAndWorkerProfile(getWorkerProfile(), profile);
    }

releaseSlot() 释放槽资源

// org.apache.seatunnel.engine.server.service.slot.DefaultSlotService#releaseSlot

    @Override
    public synchronized void releaseSlot(long jobId, SlotProfile profile) {
        LOGGER.info(
                String.format(
                        "received slot release request, jobID: %d, slot: %s", jobId, profile));
        if (!assignedSlots.containsKey(profile.getSlotID())) {
            throw new WrongTargetSlotException(
                    "Not exist this slot in slot service, slot profile: " + profile);
        }

        if (!assignedSlots.get(profile.getSlotID()).getSequence().equals(profile.getSequence())) {
            throw new WrongTargetSlotException(
                    "Wrong slot sequence in profile, slot profile: " + profile);
        }

        if (assignedSlots.get(profile.getSlotID()).getOwnerJobID() != jobId) {
            throw new WrongTargetSlotException(
                    String.format(
                            "The profile %s not belong with job %d",
                            assignedSlots.get(profile.getSlotID()), jobId));
        }

        // 将释放的资源记录回去
        assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
        unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::merge);
        profile.unassigned();
        if (!config.isDynamicSlot()) {
            unassignedSlots.put(profile.getSlotID(), profile);
        }
        assignedSlots.remove(profile.getSlotID());
        contexts.remove(profile.getSlotID());
    }