概述

本次修改基于 seatunnel v2.3.4 版本

SQL Server 虽然用的少,但业务上还真遇到了需要支持将数据同步到 SQL Server 的情况,将数据同步到已存在的表没问题,然而同步到不存在的表,需要 seatunnel 在自动建表的时候,却报了空指针异常

问题定位

根据日志定位到具体代码行,是在 SqlServerCreateTableSqlBuilder.buildColumnIdentifySql​ ​方法

    private String buildColumnIdentifySql(
            Column column, String catalogName, Map<String, String> columnComments) {
        final List<String> columnSqls = new ArrayList<>();
        columnSqls.add("[" + column.getName() + "]");
        String tyNameDef = "";
        if (StringUtils.equals(catalogName, DatabaseIdentifier.SQLSERVER)) {
            columnSqls.add(column.getSourceType());
        } else {
            // Column name
            SqlType dataType = column.getDataType().getSqlType();
            boolean isBytes = StringUtils.equals(dataType.name(), SqlType.BYTES.name());
            Long columnLength = column.getLongColumnLength();
            Long bitLen = column.getBitLen();
            // 问题就出在这行,bitLen会有null的情况
            bitLen = bitLen == -1 || bitLen <= 8 ? bitLen : bitLen >> 3;
            if (isBytes) {
                if (bitLen > 8000 || bitLen == -1) {
                    columnSqls.add(SqlServerType.VARBINARY.getName());
                } else {
                    columnSqls.add(SqlServerType.BINARY.getName());
                    tyNameDef = SqlServerType.BINARY.getName();
                }
                columnSqls.add("(" + (bitLen == -1 || bitLen > 8000 ? "max)" : bitLen + ")"));
            } else {
               ......
               ......
               ......
            }
            ......
            ......
        }
        ......
        ......
    }

问题就出在

bitLen = bitLen == -1 || bitLen <= 8 ? bitLen : bitLen >> 3;​

这行代码的意义是重新设置 bitLen,但是吧,如果当前字段并不是 bytes 类型,那么它的上一行 Long bitLen = column.getBitLen();​ ​取到的值有可能是 null 的情况

解决方法

我们将取 bitLen 的这两行代码挪到它的下一行 if 块里即可,如果当前字段是 bytes 类型,才去取 biteLen,避免出现空指针的情况

改完之后如下

    private String buildColumnIdentifySql(
            Column column, String catalogName, Map<String, String> columnComments) {
        final List<String> columnSqls = new ArrayList<>();
        columnSqls.add("[" + column.getName() + "]");
        String tyNameDef = "";
        if (StringUtils.equals(catalogName, DatabaseIdentifier.SQLSERVER)) {
            columnSqls.add(column.getSourceType());
        } else {
            // Column name
            SqlType dataType = column.getDataType().getSqlType();
            boolean isBytes = StringUtils.equals(dataType.name(), SqlType.BYTES.name());
            Long columnLength = column.getLongColumnLength();
            // Long bitLen = column.getBitLen();
            // bitLen = bitLen == -1 || bitLen <= 8 ? bitLen : bitLen >> 3;
            if (isBytes) {
                // 把取bitLen的逻辑挪到if里面,避免出现bitLen是null的情况
                Long bitLen = column.getBitLen() == null ? -1L : column.getBitLen();
                bitLen = bitLen == -1 || bitLen <= 8 ? bitLen : bitLen >> 3;
                if (bitLen > 8000 || bitLen == -1) {
                    columnSqls.add(SqlServerType.VARBINARY.getName());
                } else {
                    columnSqls.add(SqlServerType.BINARY.getName());
                    tyNameDef = SqlServerType.BINARY.getName();
                }
                columnSqls.add("(" + (bitLen == -1 || bitLen > 8000 ? "max)" : bitLen + ")"));
            } else {
               ......
               ......
               ......
            }
            ......
            ......
        }
        ......
        ......
    }

oracle自动建表同样的问题

oracle的自动建表相关类 OracleCreateTableSqlBuilder.buildColumnType 方法里面也有一样的问题,下面是原版代码

    private String buildColumnType(Column column) {
        SqlType sqlType = column.getDataType().getSqlType();
        Long columnLength = column.getLongColumnLength();
        // 就算sqlType=BYTES,会出现column.getBitLen()取出来是null的情况
        Long bitLen = column.getBitLen();
        switch (sqlType) {
            case BYTES:
                if (bitLen < 0 || bitLen > 2000) {
                    return "BLOB";
                } else {
                    return "RAW(" + bitLen + ")";
                }
            case STRING:
                if (columnLength > 0 && columnLength < 4000) {
                    return "VARCHAR2(" + columnLength + " CHAR)";
                } else {
                    return "CLOB";
                }
            default:
                String type =
                        oracleDataTypeConvertor.toConnectorType(
                                column.getName(), column.getDataType(), null);
                if (type.equals("NUMBER")) {
                    if (column.getDataType() instanceof DecimalType) {
                        DecimalType decimalType = (DecimalType) column.getDataType();
                        return "NUMBER("
                                + decimalType.getPrecision()
                                + ","
                                + decimalType.getScale()
                                + ")";
                    } else {
                        return "NUMBER";
                    }
                }
                return type;
        }
    }

我们一样将 Long bitLen = column.getBitLen();​ 改成 Long bitLen = column.getBitLen() == null ? -1L : column.getBitLen();​即可

重新打包 connector-jdbc

mvn clean package -DskipTests=true -pl seatunnel-connectors-v2/connector-jdbc -am

将新打出来的 jar 替换 seatunnel 的 connectors 目录下的 connector-jdbc-2.3.4.jar 即可