本次修改基于 seatunnel-2.3.4 版本

修改完之后,增强对 excel 的读取能力,支持 xlsx、xls,支持读取公式单元格的值

相关模块

对 excel 的支持都在 connector-file-base 模块

增强判断 xls、xlsx 文件能力

seatunnel 对 excel 的读取,都在 ExcelReadStrategy​ ​类中,原版对 excel 的读取是根据文件名后缀来判断当前文件是 xlsx 还是 xls,一旦遇到文件名随机生成的情况就无能为力了,如下

    @SneakyThrows
    @Override
    public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
        Map<String, String> partitionsMap = parsePartitionsByPath(path);
        FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);
        Workbook workbook;

        // 根据文件名后缀判断文件类型
        if (path.endsWith(".xls")) {
            workbook = new HSSFWorkbook(file);
        } else if (path.endsWith(".xlsx")) {
            workbook = new XSSFWorkbook(file);
        } else {
            throw new FileConnectorException(
                    CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                    "Only support read excel file");
        }

        Sheet sheet =
                pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())
                        ? workbook.getSheet(
                                pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key()))
                        : workbook.getSheetAt(0);
        cellCount = seaTunnelRowType.getTotalFields();
        cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size();
        ......
        ......
        ......
    }

我们把它改成使用 poi 的 WorkbookFactory.create​ ​方法来创建 workbook,让 poi 自己判断当前是什么文件类型即可

    @SneakyThrows
    @Override
    public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
        Map<String, String> partitionsMap = parsePartitionsByPath(path);
        FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);
        Workbook workbook;

        // 让poi自己判断文件类型,创建对应的 workbook
        workbook = WorkbookFactory.create(file);

        Sheet sheet =
                pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())
                        ? workbook.getSheet(
                                pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key()))
                        : workbook.getSheetAt(0);
        cellCount = seaTunnelRowType.getTotalFields();
        cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size();
        ......
        ......
        ......
    }

支持读取公式单元格值

读取单元格值的方法在 ExcelReadStrategy​ ​的 getCellValue​ ​方法内,原版如下

    private Object getCellValue(CellType cellType, Cell cell) {
        switch (cellType) {
            case STRING:
                return cell.getStringCellValue();
            case BOOLEAN:
                return cell.getBooleanCellValue();
            case NUMERIC:
                if (DateUtil.isCellDateFormatted(cell)) {
                    DataFormatter formatter = new DataFormatter();
                    return formatter.formatCellValue(cell);
                }
                return cell.getNumericCellValue();
            case ERROR:
                break;
            default:
                throw new FileConnectorException(
                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
                        String.format("[%s] type not support ", cellType));
        }
        return null;
    }

可以看到,它只对 String、Boolean、Numeric、Error 做了处理,一旦遇到公式类型,直接报错

所以我们可以增加一个判断,遇到 FORMULA​ ​类型,就使用 poi 提供的 FormulaEvaluator​ ​来读取公式单元格的结果值的类型

修改 getCellValue​ ​方法,让它能从调用方处接收一个 FormulaEvaluator​ ​的参数,避免内部多次 new

针对 FORMULA​ ​类型,根据结果值的类型,调用 cell 不同的方法获取值

    private Object getCellValue(CellType cellType, Cell cell, FormulaEvaluator evaluator) {
        switch (cellType) {
            case STRING:
                return cell.getStringCellValue();
            case BOOLEAN:
                return cell.getBooleanCellValue();
            case NUMERIC:
                if (DateUtil.isCellDateFormatted(cell)) {
                    DataFormatter formatter = new DataFormatter();
                    return formatter.formatCellValue(cell);
                }
                return cell.getNumericCellValue();
            case FORMULA:
                CellType formulaResultType = evaluator.evaluateFormulaCell(cell);
                switch (formulaResultType) {
                    case STRING:
                        return cell.getStringCellValue();
                    case BOOLEAN:
                        return cell.getBooleanCellValue();
                    case NUMERIC:
                        if (DateUtil.isCellDateFormatted(cell)) {
                            DataFormatter formatter = new DataFormatter();
                            return formatter.formatCellValue(cell);
                        }
                        return cell.getNumericCellValue();
                    case BLANK:
                    case ERROR:
                        break;
                    default:
                        throw new FileConnectorException(
                                CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
                                String.format("[%s] formula result type not support ", formulaResultType));
                }
                break;
            case BLANK:
            case ERROR:
                break;
            default:
                throw new FileConnectorException(
                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
                        String.format("[%s] type not support ", cellType));
        }
        return null;
    }

既然 getCellValue​ ​方法需要接收 FormulaEvaluator​ ​类型的参数,那么在调用方 read​ ​方法中就得 new 一个 FormulaEvaluator​

接下来就再回到 read​ ​方法,new 一个 FormulaEvaluator​,在调用 getCellValue​ ​的地方传进去

    @SneakyThrows
    @Override
    public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
        Map<String, String> partitionsMap = parsePartitionsByPath(path);
        FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);
        Workbook workbook;

        // 让poi自己判断文件类型,创建对应的 workbook
        workbook = WorkbookFactory.create(file);

        // 创建FormulaEvaluator,调用的地方在最下面
        FormulaEvaluator evaluator = workbook.getCreationHelper().createFormulaEvaluator();

        Sheet sheet =
                pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())
                        ? workbook.getSheet(
                                pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key()))
                        : workbook.getSheetAt(0);
        cellCount = seaTunnelRowType.getTotalFields();
        cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size();
        ......
        ......
        ......

        IntStream.range((int) skipHeaderNumber, rowCount)
                .mapToObj(sheet::getRow)
                .filter(Objects::nonNull)
                .forEach(
                        rowData -> {
                            int[] cellIndexes =
                                    indexes == null
                                            ? IntStream.range(0, cellCount).toArray()
                                            : indexes;
                            int z = 0;
                            SeaTunnelRow seaTunnelRow = new SeaTunnelRow(cellCount);
                            for (int j : cellIndexes) {
                                Cell cell = rowData.getCell(j);
                                seaTunnelRow.setField(
                                        z++,
                                        cell == null
                                                ? null
                                                : convert(
                                                        // 在这里将 FormulaEvaluator 传进去
                                                        getCellValue(cell.getCellType(), cell, evaluator),
                                                        fieldTypes[z - 1]));
                            }
                            if (isMergePartition) {
                                int index = seaTunnelRowType.getTotalFields();
                                for (String value : partitionsMap.values()) {
                                    seaTunnelRow.setField(index++, value);
                                }
                            }
                            seaTunnelRow.setTableId(tableId);
                            output.collect(seaTunnelRow);
                        });
    }

到此,seatunnel 就很好的支持了 xlsx、xls 的读取

然而,继续再看看 ExcelReadStrategy​ 类的 convert​ 方法,发现它对于各个类型的解析是直接用 xxx.parseXXX​ 来做转换的,一旦单元格的内容不符合转换的格式,会抛出异常,导致整个任务失败,比如浮点型字符串前后有空白符,转换就会失败,再比如浮点型转成 Integer,也会失败,容错率不高。

所以咱们再增强一下它对于类型转换的容错率

增强类型转换容错率

根据上面所说的,咱们看看 convert​ ​方法,原版对于 DOUBLE、BIGINT 等类型是直接用类的 parse 方法转换的,容错率不太友好

    @SneakyThrows
    private Object convert(Object field, SeaTunnelDataType<?> fieldType) {
        if (field == null) {
            return "";
        }
        SqlType sqlType = fieldType.getSqlType();
        switch (sqlType) {
            case MAP:
            case ARRAY:
                return objectMapper.readValue((String) field, fieldType.getTypeClass());
            case STRING:
                return field;
            case DOUBLE:
                return Double.parseDouble(field.toString());
            case BOOLEAN:
                return Boolean.parseBoolean(field.toString());
            case FLOAT:
                return (float) Double.parseDouble(field.toString());
            case BIGINT:
                return (long) Double.parseDouble(field.toString());
            case INT:
                return (int) Double.parseDouble(field.toString());
            case TINYINT:
                return (byte) Double.parseDouble(field.toString());
            case SMALLINT:
                return (short) Double.parseDouble(field.toString());
            case DECIMAL:
                return BigDecimal.valueOf(Double.parseDouble(field.toString()));
            ...
            ...
            ...
        }
    }

我们对它进行一点小改造,让它类型转换的时候兼容性高一点

先在 org.apache.seatunnel.connectors.seatunnel.file​ 包下新增一个包 utils​,在里面新增一个工具类 StrToNumberUtil​,提供字符串转成各种类型的工具方法

public class StrToNumberUtil {
    // 字符串转double
    public static Double str2Double(String str) {
        if (StringUtils.isBlank(str)) {
            return null;
        }
        try {
            return Double.parseDouble(str.trim());
        } catch (Exception e) {
            return null;
        }
    }

    // 字符串转long
    public static Long str2Long(String str) {
        if (StringUtils.isBlank(str)) {
            return null;
        }
        str = str.trim();
        // 多个小数点,不是数字,pass
        if (str.indexOf('.') != str.lastIndexOf('.')) {
            return null;
        }
        // 取整数位
        String sub = str.indexOf('.') >= 0 ? str.substring(0, str.indexOf('.')) : str;
        try {
            return Long.parseLong(sub);
        } catch (Exception e) {
            return null;
        }
    }

    // 字符串转byte
    public static Byte str2Byte(String s) {
        return Optional.ofNullable(str2Long(s)).map(Long::byteValue).orElse(null);
    }

    // 字符串转short
    public static Short str2Short(String s) {
        return Optional.ofNullable(str2Long(s)).map(Long::shortValue).orElse(null);
    }

    // 字符串转integer
    public static Integer str2Int(String s) {
        return Optional.ofNullable(str2Long(s)).map(Long::intValue).orElse(null);
    }

    // 字符串转float
    public static Float str2Float(String s) {
        return Optional.ofNullable(str2Double(s)).map(Double::floatValue).orElse(null);
    }

    // 字符串转BigDecimal
    public static BigDecimal str2BigDecimal(String s) {
        if (StringUtils.isBlank(s)) {
            return null;
        }
        try {
            return new BigDecimal(s.trim());
        } catch (Exception e) {
            return null;
        }
    }

}

然后在 ExcelReadStrategy​ 类的 convert​ 方法中使用这些方法替代原版的类型转化逻辑

    @SneakyThrows
    private Object convert(Object field, SeaTunnelDataType<?> fieldType) {
        if (field == null) {
            return "";
        }
        SqlType sqlType = fieldType.getSqlType();
        switch (sqlType) {
            case MAP:
            case ARRAY:
                return objectMapper.readValue((String) field, fieldType.getTypeClass());
            case STRING:
                // 这里改使用 toString 方法,避免后续类型转换出错
                return field.toString();
            case DOUBLE:
                // 使用工具方法
                return StrToNumberUtil.str2Double(field.toString());
            case BOOLEAN:
                return Boolean.parseBoolean(field.toString());
            case FLOAT:
                // 使用工具方法
                return StrToNumberUtil.str2Float(field.toString());
            case BIGINT:
                // 使用工具方法
                return StrToNumberUtil.str2Long(field.toString());
            case INT:
                // 使用工具方法
                return StrToNumberUtil.str2Int(field.toString());
            case TINYINT:
                // 使用工具方法
                return StrToNumberUtil.str2Byte(field.toString());
            case SMALLINT:
                // 使用工具方法
                return StrToNumberUtil.str2Short(field.toString());
            case DECIMAL:
                // 使用工具方法
                return StrToNumberUtil.str2BigDecimal(field.toString());
            ...
            ...
            ...
        }
    }

到此,seatunnel 对 excel 读取的支持又增强了一些

最后打包

全局生效

如果想让其他基于 connector-file-base 的插件都生效,那就打包 connector-file-base 模块

mvn clean package -DskipTests=true -pl seatunnel-connectors-v2/connector-file/connector-file-base -am

然后将 connector-file-base 的 jar 放进 seatunnel 部署目录的 lib 目录下,所有基于 connector-file-base 的插件都会生效

部分插件生效

想让某些基于 connector-file-base 的插件生效,那就只重新打包那一个插件即可

mvn clean package -DskipTests=true -pl [你想要生效的插件路径] -am

用新 jar 替换旧 jar 即可

其他

打包的时候可能会因为什么原因导致 maven 的 spotless 插件报错,试试先跑一下 mvn spotless:apply​,再去跑打包