
SeaTunnel增强对excel读取能力,支持xlsx、xls、公式单元格
本次修改基于 seatunnel-2.3.4 版本
修改完之后,增强对 excel 的读取能力,支持 xlsx、xls,支持读取公式单元格的值
相关模块
对 excel 的支持都在 connector-file-base 模块
增强判断 xls、xlsx 文件能力
seatunnel 对 excel 的读取,都在 ExcelReadStrategy 类中,原版对 excel 的读取是根据文件名后缀来判断当前文件是 xlsx 还是 xls,一旦遇到文件名随机生成的情况就无能为力了,如下
@SneakyThrows
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);
Workbook workbook;
// 根据文件名后缀判断文件类型
if (path.endsWith(".xls")) {
workbook = new HSSFWorkbook(file);
} else if (path.endsWith(".xlsx")) {
workbook = new XSSFWorkbook(file);
} else {
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"Only support read excel file");
}
Sheet sheet =
pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())
? workbook.getSheet(
pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key()))
: workbook.getSheetAt(0);
cellCount = seaTunnelRowType.getTotalFields();
cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size();
......
......
......
}
我们把它改成使用 poi 的 WorkbookFactory.create 方法来创建 workbook,让 poi 自己判断当前是什么文件类型即可
@SneakyThrows
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);
Workbook workbook;
// 让poi自己判断文件类型,创建对应的 workbook
workbook = WorkbookFactory.create(file);
Sheet sheet =
pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())
? workbook.getSheet(
pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key()))
: workbook.getSheetAt(0);
cellCount = seaTunnelRowType.getTotalFields();
cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size();
......
......
......
}
支持读取公式单元格值
读取单元格值的方法在 ExcelReadStrategy 的 getCellValue 方法内,原版如下
private Object getCellValue(CellType cellType, Cell cell) {
switch (cellType) {
case STRING:
return cell.getStringCellValue();
case BOOLEAN:
return cell.getBooleanCellValue();
case NUMERIC:
if (DateUtil.isCellDateFormatted(cell)) {
DataFormatter formatter = new DataFormatter();
return formatter.formatCellValue(cell);
}
return cell.getNumericCellValue();
case ERROR:
break;
default:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
String.format("[%s] type not support ", cellType));
}
return null;
}
可以看到,它只对 String、Boolean、Numeric、Error 做了处理,一旦遇到公式类型,直接报错
所以我们可以增加一个判断,遇到 FORMULA 类型,就使用 poi 提供的 FormulaEvaluator 来读取公式单元格的结果值的类型
修改 getCellValue 方法,让它能从调用方处接收一个 FormulaEvaluator 的参数,避免内部多次 new
针对 FORMULA 类型,根据结果值的类型,调用 cell 不同的方法获取值
private Object getCellValue(CellType cellType, Cell cell, FormulaEvaluator evaluator) {
switch (cellType) {
case STRING:
return cell.getStringCellValue();
case BOOLEAN:
return cell.getBooleanCellValue();
case NUMERIC:
if (DateUtil.isCellDateFormatted(cell)) {
DataFormatter formatter = new DataFormatter();
return formatter.formatCellValue(cell);
}
return cell.getNumericCellValue();
case FORMULA:
CellType formulaResultType = evaluator.evaluateFormulaCell(cell);
switch (formulaResultType) {
case STRING:
return cell.getStringCellValue();
case BOOLEAN:
return cell.getBooleanCellValue();
case NUMERIC:
if (DateUtil.isCellDateFormatted(cell)) {
DataFormatter formatter = new DataFormatter();
return formatter.formatCellValue(cell);
}
return cell.getNumericCellValue();
case BLANK:
case ERROR:
break;
default:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
String.format("[%s] formula result type not support ", formulaResultType));
}
break;
case BLANK:
case ERROR:
break;
default:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
String.format("[%s] type not support ", cellType));
}
return null;
}
既然 getCellValue 方法需要接收 FormulaEvaluator 类型的参数,那么在调用方 read 方法中就得 new 一个 FormulaEvaluator
接下来就再回到 read 方法,new 一个 FormulaEvaluator,在调用 getCellValue 的地方传进去
@SneakyThrows
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
FSDataInputStream file = hadoopFileSystemProxy.getInputStream(path);
Workbook workbook;
// 让poi自己判断文件类型,创建对应的 workbook
workbook = WorkbookFactory.create(file);
// 创建FormulaEvaluator,调用的地方在最下面
FormulaEvaluator evaluator = workbook.getCreationHelper().createFormulaEvaluator();
Sheet sheet =
pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())
? workbook.getSheet(
pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key()))
: workbook.getSheetAt(0);
cellCount = seaTunnelRowType.getTotalFields();
cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size();
......
......
......
IntStream.range((int) skipHeaderNumber, rowCount)
.mapToObj(sheet::getRow)
.filter(Objects::nonNull)
.forEach(
rowData -> {
int[] cellIndexes =
indexes == null
? IntStream.range(0, cellCount).toArray()
: indexes;
int z = 0;
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(cellCount);
for (int j : cellIndexes) {
Cell cell = rowData.getCell(j);
seaTunnelRow.setField(
z++,
cell == null
? null
: convert(
// 在这里将 FormulaEvaluator 传进去
getCellValue(cell.getCellType(), cell, evaluator),
fieldTypes[z - 1]));
}
if (isMergePartition) {
int index = seaTunnelRowType.getTotalFields();
for (String value : partitionsMap.values()) {
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
});
}
到此,seatunnel 就很好的支持了 xlsx、xls 的读取
然而,继续再看看 ExcelReadStrategy 类的 convert 方法,发现它对于各个类型的解析是直接用 xxx.parseXXX 来做转换的,一旦单元格的内容不符合转换的格式,会抛出异常,导致整个任务失败,比如浮点型字符串前后有空白符,转换就会失败,再比如浮点型转成 Integer,也会失败,容错率不高。
所以咱们再增强一下它对于类型转换的容错率
增强类型转换容错率
根据上面所说的,咱们看看 convert 方法,原版对于 DOUBLE、BIGINT 等类型是直接用类的 parse 方法转换的,容错率不太友好
@SneakyThrows
private Object convert(Object field, SeaTunnelDataType<?> fieldType) {
if (field == null) {
return "";
}
SqlType sqlType = fieldType.getSqlType();
switch (sqlType) {
case MAP:
case ARRAY:
return objectMapper.readValue((String) field, fieldType.getTypeClass());
case STRING:
return field;
case DOUBLE:
return Double.parseDouble(field.toString());
case BOOLEAN:
return Boolean.parseBoolean(field.toString());
case FLOAT:
return (float) Double.parseDouble(field.toString());
case BIGINT:
return (long) Double.parseDouble(field.toString());
case INT:
return (int) Double.parseDouble(field.toString());
case TINYINT:
return (byte) Double.parseDouble(field.toString());
case SMALLINT:
return (short) Double.parseDouble(field.toString());
case DECIMAL:
return BigDecimal.valueOf(Double.parseDouble(field.toString()));
...
...
...
}
}
我们对它进行一点小改造,让它类型转换的时候兼容性高一点
先在 org.apache.seatunnel.connectors.seatunnel.file 包下新增一个包 utils,在里面新增一个工具类 StrToNumberUtil,提供字符串转成各种类型的工具方法
public class StrToNumberUtil {
// 字符串转double
public static Double str2Double(String str) {
if (StringUtils.isBlank(str)) {
return null;
}
try {
return Double.parseDouble(str.trim());
} catch (Exception e) {
return null;
}
}
// 字符串转long
public static Long str2Long(String str) {
if (StringUtils.isBlank(str)) {
return null;
}
str = str.trim();
// 多个小数点,不是数字,pass
if (str.indexOf('.') != str.lastIndexOf('.')) {
return null;
}
// 取整数位
String sub = str.indexOf('.') >= 0 ? str.substring(0, str.indexOf('.')) : str;
try {
return Long.parseLong(sub);
} catch (Exception e) {
return null;
}
}
// 字符串转byte
public static Byte str2Byte(String s) {
return Optional.ofNullable(str2Long(s)).map(Long::byteValue).orElse(null);
}
// 字符串转short
public static Short str2Short(String s) {
return Optional.ofNullable(str2Long(s)).map(Long::shortValue).orElse(null);
}
// 字符串转integer
public static Integer str2Int(String s) {
return Optional.ofNullable(str2Long(s)).map(Long::intValue).orElse(null);
}
// 字符串转float
public static Float str2Float(String s) {
return Optional.ofNullable(str2Double(s)).map(Double::floatValue).orElse(null);
}
// 字符串转BigDecimal
public static BigDecimal str2BigDecimal(String s) {
if (StringUtils.isBlank(s)) {
return null;
}
try {
return new BigDecimal(s.trim());
} catch (Exception e) {
return null;
}
}
}
然后在 ExcelReadStrategy 类的 convert 方法中使用这些方法替代原版的类型转化逻辑
@SneakyThrows
private Object convert(Object field, SeaTunnelDataType<?> fieldType) {
if (field == null) {
return "";
}
SqlType sqlType = fieldType.getSqlType();
switch (sqlType) {
case MAP:
case ARRAY:
return objectMapper.readValue((String) field, fieldType.getTypeClass());
case STRING:
// 这里改使用 toString 方法,避免后续类型转换出错
return field.toString();
case DOUBLE:
// 使用工具方法
return StrToNumberUtil.str2Double(field.toString());
case BOOLEAN:
return Boolean.parseBoolean(field.toString());
case FLOAT:
// 使用工具方法
return StrToNumberUtil.str2Float(field.toString());
case BIGINT:
// 使用工具方法
return StrToNumberUtil.str2Long(field.toString());
case INT:
// 使用工具方法
return StrToNumberUtil.str2Int(field.toString());
case TINYINT:
// 使用工具方法
return StrToNumberUtil.str2Byte(field.toString());
case SMALLINT:
// 使用工具方法
return StrToNumberUtil.str2Short(field.toString());
case DECIMAL:
// 使用工具方法
return StrToNumberUtil.str2BigDecimal(field.toString());
...
...
...
}
}
到此,seatunnel 对 excel 读取的支持又增强了一些
最后打包
全局生效
如果想让其他基于 connector-file-base 的插件都生效,那就打包 connector-file-base 模块
mvn clean package -DskipTests=true -pl seatunnel-connectors-v2/connector-file/connector-file-base -am
然后将 connector-file-base 的 jar 放进 seatunnel 部署目录的 lib 目录下,所有基于 connector-file-base 的插件都会生效
部分插件生效
想让某些基于 connector-file-base 的插件生效,那就只重新打包那一个插件即可
mvn clean package -DskipTests=true -pl [你想要生效的插件路径] -am
用新 jar 替换旧 jar 即可
其他
打包的时候可能会因为什么原因导致 maven 的 spotless 插件报错,试试先跑一下 mvn spotless:apply,再去跑打包