Skip to content

SophiaData/Bigdata_Code_Tutorial

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

261 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Bigdata_Code_Tutorial

Flink CDC 整库实时同步示例仓库,基于 Flink 1.20 + flink-cdc 3.x 将 MySQL 数据同步至 MySQL。

CI License Java Flink Dependabot

模块

模块 说明
sync_database_mysql 整库同步核心:flink-cdc 3.x → MySqlCatalog → JDBC sink,含 SchemaEvolver
flink-demo DataStream / SQL / CDC DDL / UDF 示例与 Mock 数据源
flink-function 可复用 Flink TableFunction 示例
flink-paimon-demo MySQL → Apache Paimon 实时同步示例

快速开始

环境要求

  • Docker 和 Docker Compose(推荐)
  • JDK 11+ + Maven 3.8+(本地开发)

🚀 一键 Docker 测试(推荐)

最简单的方式是使用 Docker Compose 一键启动所有服务:

# 1. 启动所有服务(MySQL + Flink)
chmod +x test-cdc.sh
./test-cdc.sh start

# 2. 访问 Web UI
# Flink UI: http://localhost:18081
# MySQL Source: localhost:33061
# MySQL Sink: localhost:33062

# 3. 运行测试
./test-ddl-changes.sh    # DDL 变更测试
./test-load.sh           # 性能测试

# 4. 停止服务
./test-cdc.sh stop

本地开发环境

构建

# 切换到 JDK 11
export JAVA_HOME=/Users/gaotingkai/Library/Java/JavaVirtualMachines/corretto-11.0.21/Contents/Home
export PATH="$JAVA_HOME/bin:$PATH"

# 编译(跳过测试)
./mvnw -DskipTests package

# 运行单元测试(排除集成测试)
./mvnw test -Dtest='!*IT,!*IntegrationTest,!*FlinkSqlWDSTest'

# 运行覆盖率报告
./mvnw verify -Djacoco.skip=false -pl sync_database_mysql
open sync_database_mysql/target/site/jacoco/index.html

运行同步任务

修改 config.properties 中的 MySQL 连接信息后:

# 在 sync_database_mysql 目录下运行
java -cp target/sync_database_mysql-1.0.0-jar-with-dependencies.jar \
    io.sophiadata.flink.sync.FlinkSqlWDS \
    --config config.properties

详细说明见 FlinkSqlWDS.java

测试和部署

🐳 Docker 一键测试

提供完整的 Docker 测试环境,包括:

  • MySQL 源数据库和目标数据库
  • Flink 集群
  • 自动化测试脚本
  • 性能测试工具

详细使用说明见 DOCKER_TEST_GUIDE.md

🧪 测试类型

测试类型 脚本 说明
基础功能 ./test-cdc.sh start 一键启动所有服务
DDL 变更 ./test-ddl-changes.sh 测试 Schema 变更同步
性能测试 ./test-load.sh 大批量数据同步性能
集成测试 ./mvnw test 单元测试和集成测试

🚀 CI/CD

工具 状态 说明
CI/CD ✅ GitHub Actions lint + unit + IT + package + CodeQL
依赖更新 ✅ Dependabot 每周自动创建依赖 PR
代码格式化 ✅ spotless google-java-format AOSP 风格
代码规范 ✅ checkstyle 阿里巴巴 Java 规范
测试覆盖率 ✅ JaCoCo 覆盖率报告已配置
安全扫描 ✅ CodeQL Java 安全静态分析
依赖审计 ✅ dependency-review GitHub 原生依赖安全审计

详细工程化文档见 docs/reliability/

核心流程

MySQL (Source) ──CDC──> Flink (sync_database_mysql) ──JDBC──> MySQL (Sink)
                           │
                           ├── SchemaEvolver: 自动处理 schema 变更
                           └── CDBBatchSink: 批量写入 + 按表分组

📖 文档

🎯 核心特性

✅ 整库同步

  • 自动发现源数据库所有表
  • 目标表自动添加 sink_ 前缀
  • 支持数据库和表级别的过滤

✅ Schema 变更同步

  • 无需重启任务:ADD/MODIFY/DROP/RENAME COLUMN
  • 无需重启任务:CREATE/DROP TABLE
  • 自动处理:SchemaEvolver 自动同步变更

✅ 高性能

  • 批量写入:CDBBatchSink 按表批量处理
  • 并行处理:可配置并行度
  • 检查点:支持 Exactly-Once 语义

✅ 易测试

  • Docker 环境:一键启动测试环境
  • 自动化测试:DDL 变更和性能测试
  • 实时监控:Flink Web UI 可视化监控

📚 大数据资料合集

扫码订阅 ima 知识库,获取大数据学习资料:

大数据资料合集

Releases

No releases published

Packages

 
 
 

Contributors