SeaTunnel Spark 适配器源码深度解析(二):数据源适配层
本文是源码解析系列的第二篇,聚焦数据源适配层实现。通过本文可掌握:
- SeaTunnel Source 到 Spark DataSource 的转换逻辑
- 批流统一的分区策略设计
- 状态管理机制的工程实现
1. 架构转换全景图
flowchart LR
A[SeaTunnelSource] -->|API适配| B(SeaTunnelSourceSupport)
B -->|Spark 2.4| C[SparkDataSourceV1Adapter]
B -->|Spark 3.3| D[SparkDataSourceV2Adapter]
C & D --> E[Spark SQL Engine]
2. 核心代码拆解
2.1 批流统一接口设计
// 源码位置:seatunnel-engine/spark/src/main/java/org/apache/seatunnel/spark/source/SeaTunnelSourceSupport.java
public abstract class SeaTunnelSourceSupport {
// 批处理入口
public Dataset<Row> toBatchData(SparkSession spark, Config pluginConfig) {
return spark.read()
.format("seatunnel")
.option("mode", "batch")
.load();
}
// 流处理入口
public Dataset<Row> toStreamData(SparkSession spark, Config pluginConfig) {
return spark.readStream()
.format("seatunnel")
.option("mode", "stream")
.load();
}
}
设计亮点:
- 通过
mode
参数区分批流,底层共用同一套读取逻辑 - 自动识别 Spark 版本选择 V1/V2 接口
2.2 分区策略实现
// 源码位置:seatunnel-engine/spark/src/main/java/org/apache/seatunnel/spark/source/partition/PartitionStrategy.java
public interface PartitionStrategy {
// 协调模式(全局有序)
List<Partition> coordinatePartitions();
// 并行模式(局部有序)
List<Partition> parallelPartitions(int parallelism);
// 动态调整(流处理)
default void adjustForStreaming(long watermark) {
throw new UnsupportedOperationException();
}
}
典型场景:
- CDC 数据源:采用协调模式保证全局事件顺序
- 文件数据源:使用并行模式按文件块划分
2.3 状态管理机制
// 源码位置:seatunnel-engine/spark/src/main/java/org/apache/seatunnel/spark/source/state/StateManager.java
public class StateManager {
// 保存状态到 Checkpoint
public void snapshotState(Checkpoint checkpoint) {
stateBackend.put(
"sources/" + sourceId,
ByteBuffer.wrap(serializer.serialize(currentState))
);
}
// 从失败恢复
public void restoreState(Checkpoint checkpoint) {
currentState = serializer.deserialize(
stateBackend.get("sources/" + sourceId).array()
);
}
}
实现要点:
- 状态存储支持 HDFS/S3/本地文件系统
- 使用 Kryo 进行高效序列化
- 定期清理过期状态文件
3. 生产实践技巧
3.1 自定义分区策略
// 示例:按时间范围动态分区
public class TimeRangePartitionStrategy implements PartitionStrategy {
@Override
public List<Partition> parallelPartitions(int parallelism) {
return timeRanges.stream()
.map(range -> new TimeRangePartition(range))
.collect(Collectors.toList());
}
}
注册方式:
sources:
- plugin: jdbc
partition_strategy:
class: com.your.package.TimeRangePartitionStrategy
parameters:
time_field: "create_time"
3.2 状态恢复调试
# 查看 Checkpoint 文件内容
hadoop fs -cat /seatunnel/checkpoints/sources/source-1/state | hexdump -C
常见问题:
- 状态文件损坏:添加 CRC 校验码
- 版本不兼容:状态数据中记录 Schema 版本
4. 核心设计思想总结
抽象分层:
- 将数据源差异封装在 SeaTunnel 插件层
- Spark 层只处理通用数据模型
弹性扩展:
- 分区策略支持动态加载
- 状态存储可插拔替换
生产保障:
- 完善的状态恢复测试套件
- 内置分区负载均衡算法
下一篇预告:《SeaTunnel Spark 适配器源码深度解析(三):执行引擎黑盒》将剖析:
- 从 LogicalPlan 到 RDD 的转换过程
- 动态资源分配实现原理
- 自定义优化规则注入机制
文档信息
- 本文作者:Xuxiaotuan
- 本文链接:https://xuyinyin.cn/2025/07/21/seatunnel-spark-sourcecode-part2/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)