Kettle 数据集成平台:从桌面客户端到 Web 管理系统
引言:Kettle 与企业数据集成
Kettle(Kettle Extraction Transportation Transformation Load),现称为 Pentaho Data Integration,是开源领域最著名的 ETL(Extract-Transform-Load)工具之一。作为数据处理引擎,它在企业数据仓库、数据集成、数据清洗等场景中扮演着重要角色。
Kettle 的核心组件
Kettle 提供了一套完整的数据集成解决方案:
| 组件 | 功能 | 使用场景 |
|---|---|---|
| Spoon | 图形化设计器 | 设计转换(Transformation)和作业(Job) |
| Kitchen | 作业执行引擎 | 命令行方式执行作业 |
| Pan | 转换执行引擎 | 命令行方式执行转换 |
| Carte | Web 服务容器 | 提供远程执行能力 |
| Encr | 密码加密工具 | 保护敏感信息 |
传统使用方式
┌─────────────────────────────────────────────────────┐
│ Kettle 传统使用模式 │
├─────────────────────────────────────────────────────┤
│ │
│ 1. 设计阶段 │
│ ┌──────────────┐ │
│ │ Spoon │ ──→ 设计转换/作业 │
│ │ (GUI客户端) │ │
│ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 资源库 │ ──→ 存储到 DB 或文件 │
│ │ (Repository) │ │
│ └──────────────┘ │
│ │
│ 2. 执行阶段 │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Kitchen │ │ Pan │ │
│ │ (作业执行) │ │ (转换执行) │ │
│ └──────────────┘ └──────────────┘ │
│ │ │ │
│ └─────────┬───────────┘ │
│ ▼ │
│ 从资源库读取并执行 │
│ │
└─────────────────────────────────────────────────────┘
桌面客户端的痛点
在实际的企业应用中,传统的 Kettle 桌面客户端模式存在以下局限性:
1. 部署和访问限制
| 问题 | 描述 | 影响 |
|---|---|---|
| 本地安装依赖 | Spoon 是 Java 桌面应用,需要本地安装 Java 运行环境 | 运维成本高,跨平台兼容性问题 |
| 资源库访问限制 | 需要配置数据库连接,网络环境要求高 | 远程协作困难 |
| 版本一致性 | 不同客户端版本可能存在兼容性问题 | 团队协作效率降低 |
2. 调度和监控能力不足
graph TD
A[Kitchen 命令行] --> B[操作系统定时任务]
B --> C[执行作业]
C --> D[日志文件]
style A fill:#ff006e,stroke:#d6005c,color:#b3004e
style B fill:#00fff5,stroke:#00b8b0,color:#008b82
style D fill:#faff00,stroke:#e6d900,color:#c9b800
传统调度方式的问题:
- 依赖操作系统定时任务(crontab、Windows Task Scheduler)
- 执行状态分散在各个节点
- 缺乏统一的监控界面
- 失败重试机制需要手动实现
- 多节点调度协调困难
3. 企业级集成挑战
┌─────────────────────────────────────────────────────┐
│ 企业级应用集成挑战 │
├─────────────────────────────────────────────────────┤
│ │
│ ❌ 无法与现有权限系统集成 │
│ ❌ 缺乏审计日志和操作追踪 │
│ ❌ 难以实现细粒度的权限控制 │
│ ❌ 多租户场景支持不足 │
│ ❌ API 集成能力有限 │
│ ❌ 无法嵌入现有管理系统 │
│ │
└─────────────────────────────────────────────────────┘
Web 化解决方案:数据集成平台
为解决上述问题,基于 Kettle 的 Java API 构建了一套 Web 版本的数据集成平台,实现了以下核心能力:
系统架构
┌─────────────────────────────────────────────────────────────────┐
│ 数据集成平台架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 前端层 (Angular 5) │ │
│ │ ┌────────────┬────────────┬──────────────────────┐ │ │
│ │ │ 作业设计器 │ 调度管理 │ 监控面板 │ │ │
│ │ └────────────┴────────────┴──────────────────────┘ │ │
│ └────────────────────┬────────────────────────────────┘ │
│ │ HTTP/JSON │
│ ┌────────────────────▼────────────────────────────────┐ │
│ │ API 网关层 │ │
│ └────────────────────┬────────────────────────────────┘ │
│ │ Dubbo RPC │
│ ┌────────────────────▼────────────────────────────────┐ │
│ │ 服务注册中心 (ZooKeeper) │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ 服务发现 & 负载均衡 │ │ │
│ │ │ • di-job-service (作业管理服务) │ │ │
│ │ │ • di-kettle-service (Kettle 执行服务) │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ └────────────────────┬────────────────────────────────┘ │
│ │ │
│ ┌────────────────────▼────────────────────────────────┐ │
│ │ 微服务层 (Spring Boot) │ │
│ │ ┌──────────────────┐ ┌──────────────────┐ │ │
│ │ │ Job Service │ │ Kettle Service │ │ │
│ │ │ • 作业 CRUD │ │ • 作业执行 │ │ │
│ │ │ • 调度配置 │ │ • 进度监控 │ │ │
│ │ │ • Quartz 调度 │ │ • 日志收集 │ │ │
│ │ └──────────────────┘ └──────────────────┘ │ │
│ └────────────────────┬────────────────────────────────┘ │
│ │ │
│ ┌────────────────────▼────────────────────────────────┐ │
│ │ 数据层 │ │
│ │ ┌──────────────────┐ ┌──────────────────┐ │ │
│ │ │ MySQL │ │ Kettle 资源库 │ │ │
│ │ │ (元数据) │ │ (作业定义) │ │ │
│ │ └──────────────────┘ └──────────────────┘ │ │
│ └───────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
核心技术实现
1. 作业管理服务(Job Service)
基于 Spring Boot + MyBatis Plus 实现作业的元数据管理和调度配置。
作业实体定义
/**
* 数据集成作业实体
*/
@Data
@TableName("di_job")
public class DiJob {
/**
* 作业主键ID
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 作业名称
*/
@NotBlank(message = "作业名称不能为空")
private String jobName;
/**
* 作业类型:TRANSFORMATION(转换) / JOB(作业)
*/
private String jobType;
/**
* Kettle 资源库中的路径
*/
private String repositoryPath;
/**
* 调度表达式 (Cron)
*/
private String cronExpression;
/**
* 作业状态:0-禁用, 1-启用, 2-运行中
*/
private Integer status;
/**
* 创建时间
*/
private Date createTime;
/**
* 更新时间
*/
private Date updateTime;
}
REST API 设计
/**
* 数据集成作业控制器
*/
@RestController
@RequestMapping("/api/di/job")
public class DiJobController {
@Autowired
private DiJobService diJobService;
/**
* 分页查询作业列表
*/
@GetMapping("/page")
public Result<Page<DiJob>> page(
@RequestParam(defaultValue = "1") Integer current,
@RequestParam(defaultValue = "10") Integer size,
DiJob query) {
Page<DiJob> page = diJobService.page(
new Page<>(current, size),
new QueryWrapper<DiJob>()
.like(StringUtils.isNotBlank(query.getJobName()),
"job_name", query.getJobName())
.eq(query.getStatus() != null, "status", query.getStatus())
.orderByDesc("create_time")
);
return Result.success(page);
}
/**
* 创建作业
*/
@PostMapping
public Result<DiJob> create(@Valid @RequestBody DiJob diJob) {
diJob.setCreateTime(new Date());
diJob.setStatus(0); // 默认禁用状态
diJobService.save(diJob);
return Result.success(diJob);
}
/**
* 启动作业调度
*/
@PostMapping("/{id}/start")
public Result<Void> startJob(@PathVariable Long id) {
diJobService.startSchedule(id);
return Result.success();
}
/**
* 停止作业调度
*/
@PostMapping("/{id}/stop")
public Result<Void> stopJob(@PathVariable Long id) {
diJobService.stopSchedule(id);
return Result.success();
}
/**
* 立即执行作业
*/
@PostMapping("/{id}/execute")
public Result<Void> execute(@PathVariable Long id) {
diJobService.executeOnce(id);
return Result.success();
}
}
2. Quartz 调度集成
使用 Quartz 实现作业的定时调度,支持 Cron 表达式配置。
Quartz 管理器
/**
* Quartz 调度管理器
*/
@Component
public class QuartzManager {
@Autowired
private Scheduler scheduler;
/**
* 创建调度任务
*/
public void createSchedule(Long jobId, String cronExpression) {
try {
// 构建任务详情
JobDetail jobDetail = JobBuilder.newJob(KettleJob.class)
.withIdentity("JOB_" + jobId, "KETTLE_GROUP")
.usingJobData("jobId", jobId)
.build();
// 构建触发器
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("TRIGGER_" + jobId, "KETTLE_GROUP")
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
// 注册到调度器
scheduler.scheduleJob(jobDetail, trigger);
// 启动调度器(如果未启动)
if (!scheduler.isStarted()) {
scheduler.start();
}
} catch (SchedulerException e) {
throw new RuntimeException("创建调度任务失败", e);
}
}
/**
* 删除调度任务
*/
public void deleteSchedule(Long jobId) {
try {
JobKey jobKey = new JobKey("JOB_" + jobId, "KETTLE_GROUP");
scheduler.deleteJob(jobKey);
} catch (SchedulerException e) {
throw new RuntimeException("删除调度任务失败", e);
}
}
/**
* 暂停调度任务
*/
public void pauseSchedule(Long jobId) {
try {
JobKey jobKey = new JobKey("JOB_" + jobId, "KETTLE_GROUP");
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
throw new RuntimeException("暂停调度任务失败", e);
}
}
/**
* 恢复调度任务
*/
public void resumeSchedule(Long jobId) {
try {
JobKey jobKey = new JobKey("JOB_" + jobId, "KETTLE_GROUP");
scheduler.resumeJob(jobKey);
} catch (SchedulerException e) {
throw new RuntimeException("恢复调度任务失败", e);
}
}
}
Quartz 任务执行
/**
* Kettle 作业执行任务
*/
public class KettleJob implements Job {
@Override
public void execute(JobExecutionContext context) {
// 从 JobDataMap 获取作业ID
Long jobId = context.getJobDetail().getJobDataMap().getLong("jobId");
// 执行 Kettle 作业
executeKettleJob(jobId);
}
private void executeKettleJob(Long jobId) {
// 实现细节:调用 Kettle API 执行作业
// 包括:资源库连接、作业加载、执行、日志记录等
}
}
3. Kettle 执行服务
封装 Kettle API,提供作业和转换的执行能力。
作业执行核心逻辑
/**
* Kettle 执行服务
*/
@Service
public class DiKettleService {
/**
* 执行作业
*/
public void executeJob(String jobPath, Map<String, Object> params) {
try {
// 初始化 Kettle 环境
KettleEnvironment.init();
// 连接资源库
Repository repository = connectRepository();
// 加载作业
JobMeta jobMeta = repository.loadJob(
jobPath,
null
);
// 创建作业实例
Job job = new Job(repository, jobMeta);
// 设置参数
if (params != null) {
params.forEach(job::setVariable);
}
// 执行作业
job.start();
// 等待执行完成
job.waitUntilFinished();
// 记录执行日志
logExecution(job);
} catch (Exception e) {
throw new RuntimeException("执行 Kettle 作业失败", e);
}
}
/**
* 连接 Kettle 资源库
*/
private Repository connectRepository() throws KettleException {
// 根据 DB 配置连接资源库
// 返回 Repository 实例
}
}
执行状态监控
/**
* 作业执行状态监听器
*/
public class JobExecutionListener {
/**
* 开始执行
*/
public void onJobStart(Long jobId, String executionId) {
// 更新执行状态为运行中
// 记录开始时间
}
/**
* 执行完成
*/
public void onJobComplete(Long jobId, String executionId, boolean success) {
// 更新执行状态
// 记录结束时间
// 计算执行时长
// 保存执行日志
}
/**
* 执行失败
*/
public void onJobError(Long jobId, String executionId, Exception e) {
// 更新执行状态为失败
// 记录错误信息
// 触发告警
}
}
4. Dubbo 服务治理
通过 Dubbo 实现服务的分布式部署和治理。
服务提供者配置
<!-- dubbo-provider.xml -->
<dubbo:application name="di-kettle-provider" />
<dubbo:registry
address="zookeeper://localhost:2181"
client="curator" />
<dubbo:protocol
name="dubbo"
port="20880" />
<dubbo:service
interface="com.datacolour.di.api.DiKettleService"
ref="diKettleServiceImpl"
timeout="300000"
retries="0" />
服务消费者配置
<!-- dubbo-consumer.xml -->
<dubbo:application name="di-kettle-consumer" />
<dubbo:registry
address="zookeeper://localhost:2181" />
<dubbo:reference
id="diKettleService"
interface="com.datacolour.di.api.DiKettleService"
timeout="300000"
check="false" />
5. 多节点调度与负载均衡
基于 ZooKeeper 实现多节点的协调和负载分发。
┌───────────────────────────────────────────────────────────┐
│ 多节点调度架构 │
├───────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ ZooKeeper │ │
│ │ /election │ ──→ Leader 选举 (调度节点) │
│ │ /services │ ──→ 服务注册列表 │
│ └──────────────┘ │
│ │ │
│ ├───────────────────────────────────────┐ │
│ │ │ │
│ ┌────▼────┐ ┌───▼────┐ │
│ │ Node 1 │ │ Node 2 │ │
│ │ Leader │ │Follower│ │
│ │ │ │ │ │
│ │ Quartz │ │ Quartz │ │
│ │ Scheduler│ │Scheduler│ │
│ └────┬────┘ └───┬────┘ │
│ │ │ │
│ └──────────────┬───────────────────────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 执行节点池 │ │
│ │ (负载均衡) │ │
│ └──────────────┘ │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ │ │ │ │
│ ┌────▼───┐ ┌────▼───┐ ┌───▼────┐ │
│ │Worker 1│ │Worker 2│ │Worker 3│ │
│ │执行作业 │ │执行作业 │ │执行作业 │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
└───────────────────────────────────────────────────────────┘
服务注册与发现
/**
* 服务注册组件
*/
@Component
public class ServiceRegistry {
@Value("${dubbo.application.name}")
private String serviceName;
@Value("${server.port}")
private int serverPort;
@PostConstruct
public void registerService() {
// 获取本地 IP
String localIp = getLocalIp();
// 构建服务地址
String serviceAddress = localIp + ":" + serverPort;
// 注册到 ZooKeeper
zkClient.createEphemeral(
"/services/" + serviceName + "/" + serviceAddress
);
}
}
技术要点总结
| 特性 | 实现方案 |
|---|---|
| 微服务架构 | Spring Boot + Dubbo 实现服务拆分 |
| 服务治理 | ZooKeeper 实现服务注册与发现 |
| 任务调度 | Quartz 框架实现 Cron 调度 |
| 负载均衡 | 多节点部署,Leader 选举机制 |
| 数据访问 | MyBatis Plus 简化 CRUD 操作 |
| API 设计 | RESTful 风格的 HTTP 接口 |
| 前端框架 | Angular 5 (现在应称为 Angular) |
| 状态管理 | 数据库存储作业执行状态 |
系统界面
演示程序:NoPublic




