将MySQL数据同步到StarRocks需要用到 connector-jdbc、connector-starrocks 这两个插件,没有的需要先安装

编写任务配置文件:mysql_to_starrocks.conf

env {
    "parallelism"=5
    "job.mode"=BATCH
}
source {
    source000 {
        "connection_check_timeout_sec"=100
        driver="com.mysql.cj.jdbc.Driver"
        "fetch_size"=0
        password=123456
        "plugin_name"=Jdbc
        query="select * from `test`.`test_row200009_coum30`"
        "result_table_name"="t.a__tran_source_1721782563326"
        url="jdbc:mysql://192.168.1.10:3308/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=GMT%2B8"
        user=root
        partition_column="id"
    }
}
sink {
    sink000 {
        base-url="jdbc:mysql://192.168.1.20:9030"
        database=test
        nodeUrls=[
            "192.168.1.20:8030"
        ]
        password="123456"
        "plugin_name"=StarRocks
        "save_mode_create_template"="CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`\n(\n    `__auto_increment_field__` bigint NOT NULL AUTO_INCREMENT,\n    ${rowtype_fields}\n) ENGINE = OLAP DUPLICATE KEY(`__fsp_auto_increment_field__`) COMMENT \"testtesttest\" DISTRIBUTED BY HASH (`__fsp_auto_increment_field__`)\n    PROPERTIES\n(\n    \"replication_num\" = \"1\"\n);"
        "starrocks.config" {
            columns="`id`,`y`,`X0`,`X1`,`X2`,`X3`,`X4`,`X5`,`X6`,`X7`,`X8`,`X9`,`X10`,`X11`,`X12`,`X13`,`X14`,`X15`,`X16`,`X17`,`X18`,`X19`,`X20`,`X21`,`X22`,`X23`,`X24`,`X25`,`X26`,`X27`"
            format=JSON
        }
        table="test_001"
        username=root
        batch_max_bytes=10485760
        batch_max_rows=10000
    }
}

命令行单机运行

./bin/seatunnel.sh --config ./mysql_to_starrocks.conf -e local

单机运行到 5w/s 不是问题,有时候甚至可以跑到 10w/s

提升速度的主要参数:

  • source 中设置了 partition_column ,以及 env.parallelism 设置了并发度 5

  • sink 中单独设置了 batch_max_bytes(默认 5242880 [5M])、batch_max_rows(默认 1024) 这两个参数