kafka 自定义开发Sink Connector组件(兼容mysql和oracle)

发布时间 2023-12-26 17:54:40作者: Marydon

1.情景展示

目前,市场上已有不少能从kafka消费数据的插件,如:io.confluent.connect.jdbc.JdbcSinkConnector,但这个组件有个致命的问题是,只能同步字符串类型。

具体意思是:源库源表的日期类型字段,往目标库目标表插入数据的时候,只能是字符串类型,无法自动将其转成日期类型。

这样一来的话,就很鸡肋(也有可能是我没有找到正确的方法)。

由于始终无法解决日期类型的数据同步问题,最终让我决定自己开发消费组件。

2.具体分析

要想自定义消费kafka数据,主要是两个实现。

一是:继承org.apache.kafka.connect.sink.SinkConnector。

SinkConnector用于接收从kafka消费数据的指令。

二是:继承org.apache.kafka.connect.sink.SinkTask。

SinkTask用于将订阅到的数据做进一步处理。

3.具体实现

项目目录结构:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>code.marydon.kafka</groupId>
    <artifactId>marydon-connector-jdbc</artifactId>
    <version>1.0.Final</version>
    <name>kafka-connect-jdbc</name>
    <description>marydon-cdc-jdbc</description>

    <packaging>jar</packaging>
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <maven.source.plugin.version>3.8.1</maven.source.plugin.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <version.debezium>2.3.0.Final</version.debezium>
        <version.kafka>3.5.1</version.kafka>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>${version.kafka}</version>
            <!--仅在编译器有效-->
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--HikariDataSource是Spring Boot默认的数据库连接池-->
        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>4.0.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.database.jdbc</groupId>
            <artifactId>ojdbc8</artifactId>
            <version>19.7.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.oracle.database.nls</groupId>
            <artifactId>orai18n</artifactId>
            <version>19.7.0.0</version>
        </dependency>
        <!--Oraclepki是一个用于管理公钥基础设施(PKI)的软件包,它提供了用于创建、管理和分发加密密钥和证书的工具和API。-->
        <dependency>
            <groupId>com.oracle.database.security</groupId>
            <artifactId>oraclepki</artifactId>
            <version>19.7.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>

    </dependencies>
    <build>
        <finalName>${project.artifactId}-${project.version}</finalName><!-- 指定package生成的文件名 -->
        <plugins>
            <!--maven项目编译插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.compiler.plugin.version}</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <!-- 将依赖的jar包放置在项目构建所在路径的lib目录下 -->
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <!-- 是否排除间接依赖:间接依赖也拷贝 -->
                            <excludeTransitive>false</excludeTransitive>
                            <!-- 是否跳过版本号:带上版本号 -->
                            <stripVersion>false</stripVersion>
                            <!--排除范围(哪些jar包将被排除在外)-->
                            <excludeScope>provided</excludeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!--maven项目打包插件-->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

JdbcSinkConnector.java

import code.marydon.configs.JdbcSinkConfig;
import code.marydon.tasks.JdbcSinkTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * 输出连接器,用来实现读取配置信息和分配任务等一些初始化工作
 * @description:
 * @author: Marydon
 * @date: 2023-12-06 10:56
 * @version: 1.0
 * @email: marydon20170307@163.com
 */
@Slf4j
public class JdbcSinkConnector extends SinkConnector {
    private Map<String, String> configProperties;

    // 初始化
    @Override
    public void start(Map<String, String> props) {
        this.configProperties = Map.copyOf(props);

        String topics = props.get(JdbcSinkConfig.TOPICS_REGEX);
        if (StringUtils.isEmpty(topics)) {
            throw new ConnectException("JdbcSinkConnector configuration must include '" + JdbcSinkConfig.TOPICS_REGEX + "' setting");
        }
    }

    //指定要执行的Task类
    @Override
    public Class<? extends Task> taskClass() {
        return JdbcSinkTask.class;
    }

    //task对应的config
    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        log.info("Setting task configurations for {} workers.", maxTasks);
        List<Map<String, String>> configs = new ArrayList<>(maxTasks);

        for(int i = 0; i < maxTasks; i++) {
            configs.add(this.configProperties);
        }

        return configs;
    }

    @Override
    public void stop() {

    }

    //配置定义
    @Override
    public ConfigDef config() {
        // 返回配置定义,用于连接配置的校验
        return JdbcSinkConfig.CONFIG_DEFINITION;
    }

    @Override
    public String version() {
        return "1.0.Final";
    }
}

这个类可能需要变动的地方有三点:

其一:start();

start方法会对参数进行初始化,在这里我们可以对参数进行校验。

JdbcSinkConfig.TOPICS_REGEX对应的值是topics.regex,我这里就是对topics.regex参数进行了非空校验。

其二:taskClass();

taskClass方法最终指定了要运行的任务类,我们可以根据业务需要创建不同的任务类,然后根据实际所需配置所需运行的任务类。

其三:config()。

config方法里面一般会指定org.apache.kafka.common.config.ConfigDef类

该类有两个作用:

第一是当外部访问http://localhost:8083/connector-plugins/code.marydon.connectors.JdbcSinkConnector/config时,能够得到该组件向外提供了哪些可供调用的参数信息。

第二是当外部通过REST API的方式调用此插件时,能够对请求的参数及参数值进行校验。

JdbcSinkConfig.java

import code.marydon.utils.TimeZoneValidator;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.NonEmptyString;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;

import java.util.Map;
/**
 * Sink Connector Config配置
 * @description:
 * @author: Marydon
 * @date: 2023-12-06 11:18
 * @version: 1.0
 * @email: marydon20170307@163.com
 */
public class JdbcSinkConfig extends AbstractConfig {
    public static final String CONNECTION_URL = "connection.url";
    public static final String CONNECTION_USER = "connection.user";
    public static final String CONNECTION_PASSWORD = "connection.password";
    public static final String CONNECTION_CLASS = "connection.class";
    public static final String CONNECTION_DRIVER_CLASS_NAME = "connection.driveClassName";// 自定义参数
    public static final String TOPICS_REGEX = "topics.regex";
    public static final String TABLE_NAME_FORMAT = "table.name.format";
    private static final String TABLE_NAME_FORMAT_DEFAULT = "${topic}";
    public static final String DELETE_ENABLED = "delete.enabled";// 实际未用到
    private static final String DELETE_ENABLED_DEFAULT = "false";
    public static final String AUTO_CREATE = "auto.create";// 实际未用到
    private static final String AUTO_CREATE_DEFAULT = "false";
    public static final String AUTO_EVOLVE = "auto.evolve";// 实际未用到
    private static final String AUTO_EVOLVE_DEFAULT = "false";
    public static final String INSERT_MODE = "insert.mode";// 实际未用到
    private static final String INSERT_MODE_DEFAULT = "upsert";
    public static final String PK_FIELDS = "pk.fields";
    public static final String PK_MODE = "pk.mode";// 实际未用到
    public static final String FIELDS_WHITELIST = "fields.whitelist";// 未使用
    public static final String DB_TIMEZONE_CONFIG = "db.timezone";// 未使用
    public static final String DB_TIMEZONE_DEFAULT = "UTC";// 未使用
    public static final String TASKS_MAX = "tasks.max";
    public static final String COLUMNS_DATE = "columns.date";// 自定义参数
    public static final String COLUMNS_MAP = "columns.map";// 自定义参数

    public static final ConfigDef CONFIG_DEFINITION;

    static {
        CONFIG_DEFINITION = new ConfigDef()
                .define(CONNECTION_CLASS, Type.STRING, null, Importance.HIGH, "JDBC connection class.", "Connection", 1, Width.MEDIUM, "JDBC Class")
                .define(CONNECTION_URL, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, "JDBC connection URL.\nFor example: ``jdbc:oracle:thin:@localhost:1521:orclpdb1``, ``jdbc:mysql://localhost/db_name``, ``jdbc:sqlserver://localhost;instance=SQLEXPRESS;databaseName=db_name``", "Connection", 1, Width.LONG, "JDBC URL")
                .define(CONNECTION_USER, Type.STRING, null, Importance.HIGH, "JDBC connection user.", "Connection", 2, Width.MEDIUM, "JDBC User")
                .define(CONNECTION_PASSWORD, Type.PASSWORD, null, Importance.HIGH, "JDBC connection password.", "Connection", 3, Width.MEDIUM, "JDBC Password")
                .define(CONNECTION_DRIVER_CLASS_NAME, Type.STRING, null, Importance.HIGH, "JDBC connection  driver class.", "Connection", 4, Width.MEDIUM, "JDBC Driver Class")
                .define(INSERT_MODE, Type.STRING, INSERT_MODE_DEFAULT, null, Importance.HIGH, "The insertion mode to use. Supported modes are:\n``insert``\n    Use standard SQL ``INSERT`` statements.\n``upsert``\n    Use the appropriate upsert semantics for the target database if it is supported by the connector, e.g. ``INSERT OR IGNORE``.\n``update``\n    Use the appropriate update semantics for the target database if it is supported by the connector, e.g. ``UPDATE``.", "Writes", 1, Width.MEDIUM, "Insert Mode")
                .define(DELETE_ENABLED, Type.BOOLEAN, DELETE_ENABLED_DEFAULT, Importance.MEDIUM, "Whether to treat ``null`` record values as deletes. Requires ``pk.mode`` to be ``record_key``.", "Writes", 3, Width.SHORT, "Enable deletes")
                .define(TABLE_NAME_FORMAT, Type.STRING, TABLE_NAME_FORMAT_DEFAULT, new NonEmptyString(), Importance.MEDIUM, "A format string for the destination table name, which may contain '${topic}' as a placeholder for the originating topic name.\nFor example, ``kafka_${topic}`` for the topic 'orders' will map to the table name 'kafka_orders'.", "Data Mapping", 1, Width.LONG, "Table Name Format")
                .define(PK_MODE, Type.STRING, "none", null, Importance.HIGH, "The primary key mode, also refer to ``pk.fields`` documentation for interplay. Supported modes are:\n``none``\n    No keys utilized.\n``kafka``\n    Kafka coordinates are used as the PK.\n``record_key``\n    Field(s) from the record key are used, which may be a primitive or a struct.\n``record_value``\n    Field(s) from the record value are used, which must be a struct.", "Data Mapping", 2, Width.MEDIUM, "Primary Key Mode")
                .define(PK_FIELDS, Type.LIST, "", Importance.MEDIUM, "List of comma-separated Source Table primary key field names.", "Data Mapping", 3, Width.LONG, "Source Table Primary Key Fields")
                .define(FIELDS_WHITELIST, Type.LIST, "", Importance.MEDIUM, "List of comma-separated record value field names. If empty, all fields from the record value are utilized, otherwise used to filter to the desired fields.\nNote that ``pk.fields`` is applied independently in the context of which field(s) form the primary key columns in the destination database, while this configuration is applicable for the other columns.", "Data Mapping", 4, Width.LONG, "Fields Whitelist")
                .define(DB_TIMEZONE_CONFIG, Type.STRING, DB_TIMEZONE_DEFAULT, TimeZoneValidator.INSTANCE, Importance.MEDIUM, "Name of the JDBC timezone that should be used in the connector when inserting time-based values. Defaults to UTC.", "Data Mapping", 5, Width.MEDIUM, "DB Time Zone")
                .define(AUTO_CREATE, Type.BOOLEAN, AUTO_CREATE_DEFAULT, Importance.MEDIUM, "Whether to automatically create the destination table based on record schema if it is found to be missing by issuing ``CREATE``.", "DDL Support", 1, Width.SHORT, "Auto-Create")
                .define(AUTO_EVOLVE, Type.BOOLEAN, AUTO_EVOLVE_DEFAULT, Importance.MEDIUM, "Whether to automatically add columns in the table schema when found to be missing relative to the record schema by issuing ``ALTER``.", "DDL Support", 2, Width.SHORT, "Auto-Evolve")
                .define(TASKS_MAX, Type.INT, 1, Importance.MEDIUM, "max tasks", "Connection", 5, Width.SHORT, "tasks")
                .define(TOPICS_REGEX, Type.STRING, "", null, Importance.HIGH, "Subscribe topics from kafka", "Data Mapping", 6, Width.MEDIUM, "Subscribe Topics")
                .define(COLUMNS_DATE, Type.STRING, "", null, Importance.HIGH, "date columns in Target Table.\nFor example:date_column1:date_type1,date_column2:date_type2", "Data Mapping", 6, Width.MEDIUM, "date columns")
                .define(COLUMNS_MAP, Type.STRING, "", null, Importance.HIGH, "columns map between Source Table and Target Table.\nFor example:source_table_column1:target_table_column1,source_table_column2:target_table_column2", "Data Mapping", 6, Width.MEDIUM, "columns map")
        ;
    }

    public JdbcSinkConfig(Map<?, ?> props) {
        super(CONFIG_DEFINITION, props);
    }

    public static void main(String... args) {
        System.out.println(CONFIG_DEFINITION.toEnrichedRst());
    }
}

该组件的参数配置,主要参考的是confluentinc-kafka-connect-jdbc-10.7.4的JdbcSinkConnector组件中的入参。

说明文档:https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html

JdbcSinkTask.java

/**
 * 接收从kafka订阅到的数据,并做下一步处理
 * @description:
 * @author: Marydon
 * @date: 2023-12-06 11:04
 * @version: 1.0
 * @email: marydon20170307@163.com
 */
@Slf4j
public class JdbcSinkTask extends SinkTask {
    private DynamicConnection connection;
    // 待同步的表名
    private String tableName = "";
    // 主键字段
    private String primaryFields = "";
    // 目标库类型
    private String targetDataBaseName = "";
    // 获取目标表与源表的字段映射
    String[] targetColumnsMapArray = new String[0];
    // 获取目标表日期字段列
    String[] targetDateColumnsArray = new String[0];

    @Override
    public String version() {
        return new JdbcSinkConnector().version();
    }

    //task启动
    @Override
    public void start(Map<String, String> map) {
        String url = map.get(JdbcSinkConfig.CONNECTION_URL);
        String userName = map.get(JdbcSinkConfig.CONNECTION_USER);
        String password = map.get(JdbcSinkConfig.CONNECTION_PASSWORD);
        String driveClassName = map.get(JdbcSinkConfig.CONNECTION_DRIVER_CLASS_NAME);
        log.info("config:{}", map);
        this.connection = DynamicDataSource.getDynamicConnection(url, userName, password, driveClassName);

        tableName = map.get(JdbcSinkConfig.TABLE_NAME_FORMAT);
        primaryFields = map.get(JdbcSinkConfig.PK_FIELDS);
        targetDataBaseName = driveClassName.indexOf("mysql") > 0 ? "mysql" : "oracle";
        // 目标表中表示时间的字段
        String targetDateColumns = map.get(JdbcSinkConfig.COLUMNS_DATE);
        // 目标表字段与源表字段的映射关系
        String targetColumnsMap = map.get(JdbcSinkConfig.COLUMNS_MAP);

        targetColumnsMapArray = targetColumnsMap.split(",");
        targetDateColumnsArray = targetDateColumns.split(",");
    }

    //数据put
    @Override
    public void put(Collection<SinkRecord> records) {
        try {
            for (SinkRecord record : records) {
                // 数据处理
                this.syncData(record);
            }
        } catch (Exception var4) {
            throw new RuntimeException(var4);
        }
    }

    private void syncData(SinkRecord record) throws SQLException {
        Struct value = (Struct)record.value();
        if (null == value) return;
        log.debug("待同步数据:{}", value.toString());
        System.out.println(value.toString());

        // Struct key = (Struct)record.key();
        // Schema valueSchema = value.schema();
        Struct source = value.getStruct("source");
        // String sourceDatabaseName = source.getString("db");
        // String tableName = source.getString("table");
        // 源数据库类型
        String sourceConnectorName = source.getString("connector");
        // String timestamp = String.valueOf(value.getInt64("ts_ms"));
        String operation = value.getString("op");
        Struct beforeStruct = value.getStruct("before");
        Struct afterStruct = value.getStruct("after");

        String sql;
        ArrayList<String> fieldNames;
        ArrayList<String> fieldParams;
        ArrayList<Object> fieldValues;
        String fieldName;
        Object fieldValue;

        if (operation.equals(Operation.READ.code()) || operation.equals(Operation.CREATE.code())) {// r or c
            fieldNames = new ArrayList<>();
            fieldParams = new ArrayList<>();
            fieldValues = new ArrayList<>();

            // 源字段类型
            String sourceColumn;
            // 目标表字段名称
            String targetColumn;
            // 目标日期字段名称
            String targetDateColumnName;
            // 目标日期字段类型
            String targetDateColumnType;

            for (Field afterField : afterStruct.schema().fields()) {
                fieldName = afterField.name();
                fieldValue = afterStruct.get(fieldName);
                targetColumn = "";

                // 源表与目标表字段映射关系(根据源表字段映射出目标表字段)
                if (targetColumnsMapArray.length > 0) {
                    for (String tcm : targetColumnsMapArray) {
                        sourceColumn = tcm.split(":")[0];
                        targetColumn = tcm.split(":")[1];// 关键

                        // 当要新增的列名与此循环的源表列名一致时,结束循环
                        if (sourceColumn.equalsIgnoreCase(fieldName)) break;
                    }
                } else {// 说明:入参columns.map没有设值
                    // 这样的话,要求目标表字段必须和源表表字段保持一致
                    targetColumn = fieldName;
                }

                // 时间字段转换
                // 将时间字段名称,置空
                targetDateColumnName = "";
                targetDateColumnType = "";

                if (targetDateColumnsArray.length > 0) {
                    for (String ttc : targetDateColumnsArray) {
                        targetDateColumnName = ttc.split(":")[0];
                        targetDateColumnType = ttc.split(":")[1];

                        // 当要新增的列名为时间字段时,结束循环
                        if (targetDateColumnName.equals(targetColumn)) {
                            break;
                        } else {
                            // 将时间字段名称,置空
                            targetDateColumnName = "";
                            targetDateColumnType = "";
                        }
                    }
                } else {// 说明:入参columns.date没有设值
                    // 这样的话,要求目标表字段和源表表字段都没有日期字段
                    // 将时间字段名称,置空
                    targetDateColumnName = "";
                    targetDateColumnType = "";
                }

                // 目标时间字段不为空,说明当前要插入的字段是时间类型
                if (!"".equals(targetDateColumnName)) {
                    fieldNames.add(targetColumn);
                    boolean isNull = "".equals(fieldValue) || "null".equals(fieldValue) || null == fieldValue;
                    if (targetDataBaseName.equals("mysql")) {
                        if (sourceConnectorName.equals("mysql")) {
                            if (targetDateColumnType.equalsIgnoreCase("datetime")) {// 目标表字段是mysql的datetime字段
                                // mysql datetime
                                // io.debezium.time.Timestamp
                                fieldParams.add("STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')");
                                if (isNull) {
                                    fieldValues.add(null);
                                } else {
                                    fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC"));
                                }
                            } else if (targetDateColumnType.equalsIgnoreCase("date")) {// 目标表字段是mysql的date字段
                                // mysql date
                                // io.debezium.time.Date
                                fieldParams.add("STR_TO_DATE(?, '%Y-%m-%d')");
                                if (isNull) {
                                    fieldValues.add(null);
                                } else {
                                    fieldValues.add(DateUtils.getSomeDay(Integer.parseInt(fieldValue + "")));
                                }
                            } else if (targetDateColumnType.equalsIgnoreCase("timestamp")) {// 目标表字段是mysql的timestamp字段
                                // mysql timestamp
                                // io.debezium.time.ZonedTimestamp
                                fieldParams.add("STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')");
                                if (isNull) {
                                    fieldValues.add(null);
                                } else {
                                    fieldValues.add(DateUtils.fromISO8601((String) fieldValue));
                                }
                            } else {// mysql time
                                // io.debezium.time.MicroTime
                                fieldParams.add("TIME(?)");
                                if (isNull) {
                                    fieldValues.add(null);
                                } else {
                                    fieldValues.add(DateUtils.secondsToTime((Long) fieldValue / 1000000));
                                }
                            }
                        } else {// 源表oracle
                            if (targetDateColumnType.equalsIgnoreCase("datetime") || targetDateColumnType.equalsIgnoreCase("timestamp")) {// 目标表字段是mysql的datetime字段或者timestamp字段
                                // oracle timestamp-->io.debezium.time.MicroTimestamp
                                // oracle date(包含带时间和不带时间)-->io.debezium.time.Timestamp
                                fieldParams.add("STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')");
                                if (isNull) {
                                    fieldValues.add(null);
                                } else {
                                    fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC"));
                                }
                            } else {// 目标表字段是mysql的date字段
                                // oracle date
                                // io.debezium.time.Timestamp
                                fieldParams.add("STR_TO_DATE(?, '%Y-%m-%d')");
                                if (isNull) {
                                    fieldValues.add(null);
                                } else {
                                    fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC").substring(0, 10));
                                }
                            }
                        }
                    } else if (targetDataBaseName.equals("oracle")) {
                        if (sourceConnectorName.equals("mysql")) {
                            if (StringUtils.startsWithIgnoreCase(targetDateColumnType, "TIMESTAMP")) {
                                // io.debezium.time.ZonedTimestamp
                                fieldParams.add("TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS')");
                                if (isNull) {
                                    fieldValues.add(null);
                                } else {
                                    fieldValues.add(DateUtils.fromISO8601((String) fieldValue));
                                }
                            } else {// DATE
                                // mysql转oracle也没有关系(已经排除mysql time类型字段)
                                fieldParams.add("TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS')");
                                if (isNull) {
                                    fieldValues.add(null);
                                } else {
                                    if (String.valueOf(fieldValue).length() > 5) {// 源类型是datetime
                                        fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC"));
                                    } else {// date
                                        fieldValues.add(DateUtils.getSomeDay(Integer.parseInt(fieldValue + "")));
                                    }
                                }
                            }
                        } else {// oracle
                            fieldParams.add("TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS')");
                            if (isNull) {
                                fieldValues.add(null);
                            } else {
                                fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC"));
                            }
                        }
                    }
                } else {// 非时间字段
                    // fieldNames.add(fieldName);
                    fieldNames.add(targetColumn);
                    fieldParams.add("?");
                    fieldValues.add(fieldValue);
                }
            }
            sql = "insert into " + tableName + "(" + String.join(",", fieldNames) + ")values(" + String.join(",", fieldParams) + ")";
            log.info(sql);
            this.connection.insert(sql, fieldValues.toArray());
        } else if (operation.equals(Operation.UPDATE.code())) {// u
            fieldNames = new ArrayList<>();
            fieldParams = new ArrayList<>();
            fieldValues = new ArrayList<>();

            // 源字段类型
            String sourceColumn;
            // 目标表字段名称
            String targetColumn;
            // 目标时间字段名称
            String targetDateColumnName;
            // 目标时间字段类型
            String targetDateColumnType;

            // 先比对值前后变化,确定哪些字段需要更新
            // 说明:主键不能变
            for (Field afterField : afterStruct.schema().fields()) {
                fieldName = afterField.name();
                fieldValue = afterStruct.get(fieldName);
                // after和before相同字段的值不一致,说明被更新(mysql,数据更新前后全字段展示,Oracle只提供数据发生变化的字段+主键字段)
                if (!String.valueOf(fieldValue).equals(String.valueOf(beforeStruct.get(fieldName)))) {
                    targetColumn = "";
                    // 源表与目标表字段映射关系
                    for (String tcm : targetColumnsMapArray) {
                        sourceColumn = tcm.split(":")[0];
                        targetColumn = tcm.split(":")[1];

                        // 当要更新的列名与此循环的源表列名一致时,结束循环
                        if (sourceColumn.equalsIgnoreCase(fieldName)) break;
                    }

                    // 时间字段转换
                    // 将时间字段名称,置空
                    targetDateColumnName = "";
                    targetDateColumnType = "";
                    for (String ttc : targetDateColumnsArray) {
                        targetDateColumnName = ttc.split(":")[0];
                        targetDateColumnType = ttc.split(":")[1];

                        // 当要更新的列名为时间字段时,结束循环
                        if (targetDateColumnName.equals(targetColumn)) {
                            break;
                        } else {
                            // 将时间字段名称,置空
                            targetDateColumnName = "";
                            targetDateColumnType = "";
                        }
                    }

                    // 目标时间字段不为空,说明当前要更新的字段是时间类型
                    if (!"".equals(targetDateColumnName)) {
                        boolean isNull = "".equals(fieldValue) || "null".equals(fieldValue) || null == fieldValue;
                        if (targetDataBaseName.equals("mysql")) {
                            if (sourceConnectorName.equals("mysql")) {
                                if (targetDateColumnType.equalsIgnoreCase("datetime")) {// 目标表字段是mysql的datetime字段
                                    // mysql datetime
                                    // io.debezium.time.Timestamp
                                    fieldNames.add(targetColumn + "=STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')");
                                    if (isNull) {
                                        fieldValues.add(null);
                                    } else {
                                        fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC"));
                                    }
                                } else if (targetDateColumnType.equalsIgnoreCase("date")) {// 目标表字段是mysql的date字段
                                    // mysql date
                                    // io.debezium.time.Date
                                    fieldNames.add(targetColumn + "=STR_TO_DATE(?, '%Y-%m-%d')");
                                    if (isNull) {
                                        fieldValues.add(null);
                                    } else {
                                        fieldValues.add(DateUtils.getSomeDay((Integer) fieldValue));
                                    }
                                } else if (targetDateColumnType.equalsIgnoreCase("timestamp")) {// 目标表字段是mysql的timestamp字段
                                    // mysql timestamp
                                    // io.debezium.time.ZonedTimestamp
                                    fieldNames.add(targetColumn + "=STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')");
                                    if (isNull) {
                                        fieldValues.add(null);
                                    } else {
                                        fieldValues.add(DateUtils.fromISO8601((String) fieldValue));
                                    }
                                } else {
                                    // mysql time
                                    // io.debezium.time.MicroTime
                                    fieldNames.add(targetColumn + "=TIME(?)");
                                    if (isNull) {
                                        fieldValues.add(null);
                                    } else {
                                        fieldValues.add(DateUtils.secondsToTime((Long) fieldValue / 1000000));
                                    }
                                }
                            } else {// 源表oracle
                                if (targetDateColumnType.equalsIgnoreCase("datetime") || targetDateColumnType.equalsIgnoreCase("timestamp")) {// 目标表字段是mysql的datetime字段或者timestamp字段
                                    // oracle timestamp-->io.debezium.time.MicroTimestamp
                                    // oracle date(包含带时间和不带时间)-->io.debezium.time.Timestamp
                                    fieldNames.add(targetColumn + "=STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')");
                                    if (isNull) {
                                        fieldValues.add(null);
                                    } else {
                                        fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC"));
                                    }
                                } else {// 目标表字段是mysql的date字段
                                    // oracle date
                                    // io.debezium.time.Timestamp
                                    fieldNames.add(targetColumn + "=STR_TO_DATE(?, '%Y-%m-%d')");
                                    if (isNull) {
                                        fieldValues.add(null);
                                    } else {
                                        fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC").substring(0, 10));
                                    }
                                }
                            }
                        } else if (targetDataBaseName.equals("oracle")) {
                            if (sourceConnectorName.equals("mysql")) {
                                if (StringUtils.startsWithIgnoreCase(targetDateColumnType, "TIMESTAMP")) {
                                    // io.debezium.time.ZonedTimestamp
                                    fieldNames.add(targetColumn + "=TO_TIMESTAMP(?, 'YYYY-MM-DD HH24:MI:SS')");
                                    if (isNull) {
                                        fieldValues.add(null);
                                    } else {
                                        fieldValues.add(DateUtils.fromISO8601((String) fieldValue));
                                    }
                                } else {// DATE
                                    // mysql转oracle也没有关系(已经排除mysql time类型字段)
                                    fieldNames.add(targetColumn + "=TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS')");
                                    if (isNull) {
                                        fieldValues.add(null);
                                    } else {
                                        if (String.valueOf(fieldValue).length() > 5) {// 源类型是datetime
                                            fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC"));
                                        } else {// date
                                            fieldValues.add(DateUtils.getSomeDay(Integer.parseInt(fieldValue + "")));
                                        }
                                    }
                                }
                            } else {// oracle
                                fieldNames.add(targetColumn + "=TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS')");
                                if (isNull) {
                                    fieldValues.add(null);
                                } else {
                                    fieldValues.add(DateUtils.timestampToString((Long) fieldValue, "UTC"));
                                }
                            }
                        }
                    } else {// 非时间字段
                        // fieldNames.add(fieldName + "=?");
                        fieldNames.add(targetColumn + "=?");
                        fieldValues.add(fieldValue);
                    }
                }
            }

            // where条件,根据主键更新
            for (String pk : primaryFields.split(",")) {
                fieldParams.add(pk + "=?");
                for (Field f : afterStruct.schema().fields()) {
                    fieldName = f.name();

                    if (StringUtils.equalsIgnoreCase(fieldName, pk)) {
                        fieldValues.add(afterStruct.get(pk));
                        break;
                    }
                }

            }

            sql = "UPDATE " + tableName + " SET " + String.join(",", fieldNames) + " WHERE " + String.join(" AND ", fieldParams);
            log.info(sql);
            this.connection.update(sql, fieldValues.toArray());
        } else if (operation.equals(Operation.DELETE.code()) || operation.equals(Operation.TRUNCATE.code())) {// d or t
            fieldParams = new ArrayList<>();
            fieldValues = new ArrayList<>();

            // where条件,根据主键删除
            for (String pk : primaryFields.split(",")) {
                fieldParams.add(pk + "=?");
                for (Field f : beforeStruct.schema().fields()) {
                    fieldName = f.name();

                    if (StringUtils.equalsIgnoreCase(fieldName, pk)) {
                        fieldValues.add(beforeStruct.get(pk));
                        break;
                    }
                }

            }

            // 根据主键删除记录
            sql = "DELETE FROM " + tableName + " WHERE " + String.join(" AND ", fieldParams);
            log.info(sql);
            this.connection.update(sql, fieldValues.toArray());
        }

    }

    @Override
    public void stop() {

    }
}

该任务类,最终实现了:

mysql-->oracle,mysql-->oracle,oracle-->mysql,oracle-->oracle的增、删、改。

其它类

DynamicDataSource.java

import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.*;
import javax.sql.DataSource;
/**
 * 动态配置数据源
 * @description:
 * @author: Marydon
 * @date: 2023-12-06 16:03
 * @version: 1.0
 * @email: marydon20170307@163.com
 */
@Slf4j
public class DynamicDataSource {
    private static final ConcurrentHashMap<String, DataSource> dataSources = new ConcurrentHashMap<>();
    private static DynamicDataSource instance = null;
    private static final Object lock = new Object();
    static int index = 1;

    private DynamicDataSource() {
    }

    private static DynamicDataSource getInstance() {
        if (instance == null) {
            synchronized(DynamicDataSource.class) {
                if (instance == null) {
                    instance = new DynamicDataSource();
                }
            }
        }

        return instance;
    }

    private DataSource getDataSource(String url, String userName, String pwd, String driveClassName, int maxActive) {
        String dataSourceString = url + userName + pwd + driveClassName;
        String hashCode = "ds" + dataSourceString.hashCode();
        if (!dataSources.containsKey(hashCode)) {
            synchronized(lock) {
                if (!dataSources.containsKey(hashCode)) {
                    HikariDataSource ds = new HikariDataSource();
                    ds.setJdbcUrl(url);
                    ds.setUsername(userName);
                    ds.setPassword(pwd);
                    ds.setDriverClassName(driveClassName);
                    ds.setMaximumPoolSize(maxActive);
                    dataSources.put(hashCode, ds);
                    log.debug("DataSource Key:" + hashCode);
                }
            }
        }

        return dataSources.get(hashCode);
    }

    public static DynamicConnection getDynamicConnection(String url, String userName, String pwd, String driveClassName) {
        return getDynamicConnection(url, userName, pwd, driveClassName, 50);
    }

    public static DynamicConnection getDynamicConnection(String url, String userName, String pwd, String driveClassName, int maxActive) {
        DynamicDataSource dynamicDataSource = getInstance();
        DataSource ds = dynamicDataSource.getDataSource(url, userName, pwd, driveClassName, maxActive);
        return new DynamicConnection(ds);
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);

        for(int i = 0; i < 100; ++i) {
            completionService.submit(() ->{
                DynamicConnection conn = code.marydon.db.DynamicDataSource.getDynamicConnection("jdbc:mysql://192.168.0.142:3306/platform60", "root", "Xyh@3613571", "com.mysql.cj.jdbc.Driver");
                List result = conn.queryForList("select * from base_ac_user limit 1,5");
                Integer[] params = new Integer[]{1, 7};
                DynamicConnection conn1 = code.marydon.db.DynamicDataSource.getDynamicConnection("jdbc:mysql://192.168.0.142:3306/xyhplat", "root", "Xyh@3613571", "com.mysql.cj.jdbc.Driver");
                List result1 = conn1.queryForList("select * from base_ac_user limit ?,?", params);
                System.out.println(code.marydon.db.DynamicDataSource.index++ + ":" + result.size() + "," + result1.size());
                return null;
            });
        }

    }

}

DynamicConnection.java

import lombok.extern.slf4j.Slf4j;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
/**
 * 数据库动态连接
 * @description:
 * @author: Marydon
 * @date: 2023-12-06 16:06
 * @version: 1.0
 * @email: marydon20170307@163.com
 */
@Slf4j
public class DynamicConnection {
    private DataSource ds;

    public DynamicConnection(DataSource ds) {
        this.ds = ds;
    }

    public List<Map<String, Object>> queryForList(String sql) throws SQLException {
        Connection conn = null;
        Statement statement = null;
        ResultSet rs = null;
        try {
            conn = this.ds.getConnection();
            statement = conn.createStatement();
            rs = statement.executeQuery(sql);
            return this.rsToList(rs);
        } catch (Exception var10) {
            throw new RuntimeException(var10);
        } finally {
            closeDB(conn, statement, rs, null);
        }
    }

    public List<Map<String, Object>> queryForList(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;

        try {
            conn = this.ds.getConnection();
            ps = conn.prepareStatement(sql);

            for(int i = 0; i < params.length; ++i) {
                ps.setObject(i + 1, params[i]);
            }

            rs = ps.executeQuery();
            return this.rsToList(rs);
        } finally {
            closeDB(conn, ps, rs, null);
        }
    }

    private List<Map<String, Object>> rsToList(ResultSet rs) throws SQLException {
        List<Map<String, Object>> list = new ArrayList<>();
        ResultSetMetaData rsmd = rs.getMetaData();
        int rowCount = rsmd.getColumnCount();
        Map<String, Object> map;

        while(rs.next()) {
            map = new HashMap<>(rowCount);

            for(int i = 1; i <= rowCount; ++i) {
                map.put(rsmd.getColumnName(i), rs.getObject(i));
            }

            list.add(map);
        }

        return list;
    }

    public boolean insert(String sql) throws SQLException {
        Connection conn = null;
        Statement statement = null;
        try {
            conn = this.ds.getConnection();
            statement = conn.createStatement();
            return statement.execute(sql);
        } finally {
            closeDB(conn, statement, null, null);
        }
    }

    public boolean insert(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = this.ds.getConnection();
            ps = conn.prepareStatement(sql);

            for(int i = 0; i < params.length; ++i) {
                ps.setObject(i + 1, params[i]);
            }

            return ps.execute();
        } finally {
            closeDB(conn, ps, null, null);
        }
    }

    public boolean executeBatch(String sql, List<Object[]> list) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = this.ds.getConnection();
            conn.setAutoCommit(false);
            ps = conn.prepareStatement(sql);

            for(int i = 0; i < list.size(); ++i) {
                Object[] params = list.get(i);

                for(int j = 0; j < params.length; ++j) {
                    ps.setObject(j + 1, params[j]);
                }

                ps.addBatch();
                if (i % 300 == 0) {
                    ps.executeBatch();
                    ps.clearBatch();
                }
            }

            ps.executeBatch();
            ps.clearBatch();
            conn.commit();
            ps.close();
            return true;
        } catch (Exception var11) {
            if (conn != null) {
                conn.rollback();
            }
            return false;
        } finally {
            closeDB(conn, ps, null, null);
        }

    }

    public int update(String sql) throws SQLException {
        Connection conn = null;
        Statement statement = null;
        try {
            conn = this.ds.getConnection();
            statement = conn.createStatement();
            return statement.executeUpdate(sql);
        } finally {
            closeDB(conn, statement, null, null);
        }
    }

    public int update(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        try {
            conn = this.ds.getConnection();
            ps = conn.prepareStatement(sql);

            int result;
            for(result = 0; result < params.length; ++result) {
                ps.setObject(result + 1, params[result]);
            }

            return ps.executeUpdate();
        } finally {
            closeDB(conn, ps, null, null);
        }
    }

    public boolean execProcedure(String sql) throws SQLException {
        return this.execProcedure(sql, new Object[0]);
    }

    public boolean execProcedure(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        CallableStatement cs = null;

        try {
            conn = this.ds.getConnection();
            cs = conn.prepareCall(sql);

            for(int i = 0; i < params.length; ++i) {
                cs.setObject(i + 1, params[i]);
            }

            return cs.execute();
        } finally {
            closeDB(conn, null, null, cs);
        }
    }

    public List<Map<String, Object>> queryProcedure(String sql) throws SQLException {
        return this.queryProcedure(sql, new Object[0]);
    }

    public List<Map<String, Object>> queryProcedure(String sql, Object[] params) throws SQLException {
        Connection conn = null;
        CallableStatement cs = null;
        ResultSet rs = null;

        try {
            conn = this.ds.getConnection();
            log.debug("获取链接时长:");
            cs = conn.prepareCall(sql);
            if (params != null) {
                for(int i = 0; i < params.length; ++i) {
                    cs.setObject(i + 1, params[i]);
                }
            }

            rs = cs.executeQuery();
            log.debug("执行语句时长:,sql:" + sql);
            return this.rsToList(rs);
        } finally {
            log.debug("执行连接池回收");
            closeDB(conn, null, rs, cs);
        }
    }

    public List<Map<String, Object>> queryProcedureMoreRes(String sql) throws SQLException {
        return this.queryProcedure(sql);
    }

    public List<String> getColumnNames(String sql) throws SQLException {
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        List<String> columns = new ArrayList<>();

        try {
            conn = this.ds.getConnection();
            ps = conn.prepareStatement(sql);
            rs = ps.executeQuery();
            ResultSetMetaData rsmd = rs.getMetaData();

            for(int i = 1; i <= rsmd.getColumnCount(); ++i) {
                columns.add(rsmd.getColumnName(i));
            }

            return columns;
        } finally {
            closeDB(conn, ps, rs, null);
        }
    }

    /**
     * 关闭数据库连接
     */
    public void closeDB(Connection conn, Statement ps, ResultSet rs, CallableStatement cs) {
        try {
            if (null != ps && !ps.isClosed()) {
                ps.close();
            }
            if (null != rs && !rs.isClosed()) {
                rs.close();
            }
            if (null != cs && !cs.isClosed()) {
                cs.close();
            }

            if (null != conn && !conn.isClosed()) {
                conn.close();
                log.debug("连接池已关闭");
            }
        } catch (SQLException e) {
            e.printStackTrace();
            log.error("数据库断开连接失败:{}", e.getMessage());
        }
    }
}

DateUtils.java


import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;

/**
 * 日期工具类
 * @description: 处理日期间的格式转换
 * @author: Marydon
 * @date: 2023-12-07 11:48
 * @version: 1.0
 * @email: marydon20170307@163.com
 */
public class DateUtils {
    // 年月日(大写M:表示月份,小写m:表示分钟)
    public static final String FORMAT_DATE = "yyyy-MM-dd";
    // 时分秒(大写H:表示24小时制,小写h:表示12小时制)
    public static final String FORMAT_TIME = "HH:mm:ss";
    // 年月日时分秒
    public static final String FORMAT_DATE_TIME = FORMAT_DATE + " " + FORMAT_TIME;
    // 时区
    // 东八区:GMT+8/UTC+8
    public static final String FORMAT_TIME_ZONE = "Asia/Shanghai";


    /**
     * 将时间戳转成日期字符串
     * @param timestamp 时间
     * @return 日期格式字符串
     */
    public static String timestampToString(Long timestamp) {
        return toDateTimeString(fromTimeMills(timestamp), FORMAT_DATE_TIME);
    }

    public static String timestampToString(Long timestamp, String timeZone) {
        if (String.valueOf(timestamp).length() == 16) {// 16:毫秒
            return toDateTimeString(fromTimeMills(timestamp / 1000, timeZone), FORMAT_DATE_TIME);
        } else {// 13:秒
            return toDateTimeString(fromTimeMills(timestamp, timeZone), FORMAT_DATE_TIME);
        }
    }

    /**
     * 日期转字符串(日期+时间)
     * @attention: jdk>=1.8
     * @date: 2020年08月31日 0031 17:04
     * @param: dateTime
     * @param: pattern
     * @return: java.lang.String
     */
    public static String toDateTimeString(LocalDateTime dateTime, String pattern) {
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(pattern);
        return dateTimeFormatter.format(dateTime);
    }

    /**
     * 毫秒数转LocalDateTime
     * @attention: jdk>=1.8
     * @date: 2020年09月10日 0010 11:22
     * @param: timeMills
     * @return: java.time.LocalDateTime
     */
    public static LocalDateTime fromTimeMills(long timeMills){
        return fromTimeMills(timeMills, FORMAT_TIME_ZONE);
    }

    public static LocalDateTime fromTimeMills(long timeMills, String timeZone){
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(timeMills), ZoneId.of(timeZone));
    }

    /**
     * 日期字符串按指定格式转LocalDateTime
     * @attention:
     * @date: 2021/7/28 15:05
     * @param: dateTimeStr 日期字符串
     * 2023-12-07T16:00:00Z
     * "2023-12-07T16:00:00Z"是一种ISO 8601标准的日期时间表示方式
     * 这个字符串表示的是一个特定的时间点:2023年12月7日,下午4点(16点),0分钟,0秒。其中“T”是时间标识符,“Z”表示的是协调世界时(UTC)。
     * 这种格式是可以精确到秒的时间戳
     * @return: java.time.LocalDateTime
     */
    public static LocalDateTime toLocalDateTime(String dateTimeStr) {
        // UTC时间
        DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME;
        ZonedDateTime zonedDateTime = ZonedDateTime.parse(dateTimeStr, formatter);
        // UTC-->
        return zonedDateTime.toLocalDateTime().plusHours(-6);
    }

    /**
     * ISO 8601标准日期转成字符串
     * @param dateTimeStr
     * 2023-12-07T16:00:00Z
     * "2023-12-07T16:00:00Z"是一种ISO 8601标准的日期时间表示方式
     * 这个字符串表示的是一个特定的时间点:2023年12月7日,下午4点(16点),0分钟,0秒。其中“T”是时间标识符,“Z”表示的是协调世界时(UTC)。
     * 这种格式是可以精确到秒的时间戳
     * @return
     */
    public static String fromISO8601(String dateTimeStr) {
        return toDateTimeString(toLocalDateTime(dateTimeStr), FORMAT_DATE_TIME);
    }

    /**
     * 根据天数倒推日期
     * https://www.cnblogs.com/Marydon20170307/p/10672030.html
     * @param days
     * @return
     */
    public static String getSomeDay(int days){
        SimpleDateFormat sdf = new SimpleDateFormat(FORMAT_DATE);
        Date date;
        try {
            date = sdf.parse("1970-01-01");
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(date);
        calendar.add(Calendar.DAY_OF_YEAR, days);
        date = calendar.getTime();
        return sdf.format(date);
    }

    /**
     * 根据秒数倒推时间
     * @param seconds 77400‬
     * @return HH:mm:ss
     * 21:30:00
     */
    public static String secondsToTime(Long seconds){
        String dateTime = String.format("%d:%d:%d", seconds / 3600, (seconds % 3600) / 60, seconds % 60);
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        Date date;
        try {
            date = sdf.parse(dateTime);
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
        return sdf.format(date);
    }

    public static void main(String[] args) {
        // mysql
        System.out.println(timestampToString(1702027934000L,"UTC"));
        System.out.println(fromISO8601("2023-12-08T15:32:19Z"));
        System.out.println(getSomeDay(19699));
        // System.out.println(secondsToTime(34419000000L / 1000000));
        // oracle
        // System.out.println(timestampToString(1702837490000L,"UTC"));
        // System.out.println(timestampToString(1702826697000000L,"UTC"));
        System.out.println(secondsToTime(77400L));
    }
}

Operation.java

/**
 * 数据库操作关键词
 * @description:
 * @author: Marydon
 * @date: 2023-12-06 17:37
 * @version: 1.0
 * @email: marydon20170307@163.com
 */
public enum Operation {
    // 插入:read(r)。表示查询操作,数据部只包含后镜像(after)。before的值为null
    READ("r"),
    // 建表:create(c)。表示创建操作,数据部只包含后镜像(after)。
    CREATE("c"),
    // 更新:update(u)。表示更新操作,数据部同时包含前镜像(before)和后镜像(after)。before是更新前的当前行数据,after是更新后的当前行所有数据
    UPDATE("u"),
    // 删除:delete(d)。表示删除操作,数据部只包含前镜像(before)。after的值为null
    DELETE("d"),
    // 清空表
    TRUNCATE("t"),

    MESSAGE("m");
    private final String code;

    Operation(String code) {
        this.code = code;
    }

    public String code() {
        return this.code;
    }
}

TimeZoneValidator.java

import java.util.Arrays;
import java.util.TimeZone;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigDef.Validator;

/**
 * 时区校验
 * @description:
 * @author: Marydon
 * @date: 2023-12-07 10:06
 * @version: 1.0
 * @email: marydon20170307@163.com
 */
public class TimeZoneValidator implements Validator {
    public static final TimeZoneValidator INSTANCE = new TimeZoneValidator();

    public TimeZoneValidator() {
    }

    public void ensureValid(String name, Object value) {
        if (value != null && !Arrays.asList(TimeZone.getAvailableIDs()).contains(value.toString())) {
            throw new ConfigException(name, value, "Invalid time zone identifier");
        }
    }

    public String toString() {
        return "Any valid JDK time zone";
    }

    public static void main(String[] args) {
        // [Africa/Abidjan, Africa/Accra, Africa/Addis_Ababa, Africa/Algiers, Africa/Asmara, Africa/Asmera, Africa/Bamako, Africa/Bangui, Africa/Banjul, Africa/Bissau, Africa/Blantyre, Africa/Brazzaville, Africa/Bujumbura, Africa/Cairo, Africa/Casablanca, Africa/Ceuta, Africa/Conakry, Africa/Dakar, Africa/Dar_es_Salaam, Africa/Djibouti, Africa/Douala, Africa/El_Aaiun, Africa/Freetown, Africa/Gaborone, Africa/Harare, Africa/Johannesburg, Africa/Juba, Africa/Kampala, Africa/Khartoum, Africa/Kigali, Africa/Kinshasa, Africa/Lagos, Africa/Libreville, Africa/Lome, Africa/Luanda, Africa/Lubumbashi, Africa/Lusaka, Africa/Malabo, Africa/Maputo, Africa/Maseru, Africa/Mbabane, Africa/Mogadishu, Africa/Monrovia, Africa/Nairobi, Africa/Ndjamena, Africa/Niamey, Africa/Nouakchott, Africa/Ouagadougou, Africa/Porto-Novo, Africa/Sao_Tome, Africa/Timbuktu, Africa/Tripoli, Africa/Tunis, Africa/Windhoek, America/Adak, America/Anchorage, America/Anguilla, America/Antigua, America/Araguaina, America/Argentina/Buenos_Aires, America/Argentina/Catamarca, America/Argentina/ComodRivadavia, America/Argentina/Cordoba, America/Argentina/Jujuy, America/Argentina/La_Rioja, America/Argentina/Mendoza, America/Argentina/Rio_Gallegos, America/Argentina/Salta, America/Argentina/San_Juan, America/Argentina/San_Luis, America/Argentina/Tucuman, America/Argentina/Ushuaia, America/Aruba, America/Asuncion, America/Atikokan, America/Atka, America/Bahia, America/Bahia_Banderas, America/Barbados, America/Belem, America/Belize, America/Blanc-Sablon, America/Boa_Vista, America/Bogota, America/Boise, America/Buenos_Aires, America/Cambridge_Bay, America/Campo_Grande, America/Cancun, America/Caracas, America/Catamarca, America/Cayenne, America/Cayman, America/Chicago, America/Chihuahua, America/Coral_Harbour, America/Cordoba, America/Costa_Rica, America/Creston, America/Cuiaba, America/Curacao, America/Danmarkshavn, America/Dawson, America/Dawson_Creek, America/Denver, America/Detroit, America/Dominica, America/Edmonton, America/Eirunepe, America/El_Salvador, America/Ensenada, America/Fort_Nelson, America/Fort_Wayne, America/Fortaleza, America/Glace_Bay, America/Godthab, America/Goose_Bay, America/Grand_Turk, America/Grenada, America/Guadeloupe, America/Guatemala, America/Guayaquil, America/Guyana, America/Halifax, America/Havana, America/Hermosillo, America/Indiana/Indianapolis, America/Indiana/Knox, America/Indiana/Marengo, America/Indiana/Petersburg, America/Indiana/Tell_City, America/Indiana/Vevay, America/Indiana/Vincennes, America/Indiana/Winamac, America/Indianapolis, America/Inuvik, America/Iqaluit, America/Jamaica, America/Jujuy, America/Juneau, America/Kentucky/Louisville, America/Kentucky/Monticello, America/Knox_IN, America/Kralendijk, America/La_Paz, America/Lima, America/Los_Angeles, America/Louisville, America/Lower_Princes, America/Maceio, America/Managua, America/Manaus, America/Marigot, America/Martinique, America/Matamoros, America/Mazatlan, America/Mendoza, America/Menominee, America/Merida, America/Metlakatla, America/Mexico_City, America/Miquelon, America/Moncton, America/Monterrey, America/Montevideo, America/Montreal, America/Montserrat, America/Nassau, America/New_York, America/Nipigon, America/Nome, America/Noronha, America/North_Dakota/Beulah, America/North_Dakota/Center, America/North_Dakota/New_Salem, America/Nuuk, America/Ojinaga, America/Panama, America/Pangnirtung, America/Paramaribo, America/Phoenix, America/Port-au-Prince, America/Port_of_Spain, America/Porto_Acre, America/Porto_Velho, America/Puerto_Rico, America/Punta_Arenas, America/Rainy_River, America/Rankin_Inlet, America/Recife, America/Regina, America/Resolute, America/Rio_Branco, America/Rosario, America/Santa_Isabel, America/Santarem, America/Santiago, America/Santo_Domingo, America/Sao_Paulo, America/Scoresbysund, America/Shiprock, America/Sitka, America/St_Barthelemy, America/St_Johns, America/St_Kitts, America/St_Lucia, America/St_Thomas, America/St_Vincent, America/Swift_Current, America/Tegucigalpa, America/Thule, America/Thunder_Bay, America/Tijuana, America/Toronto, America/Tortola, America/Vancouver, America/Virgin, America/Whitehorse, America/Winnipeg, America/Yakutat, America/Yellowknife, Antarctica/Casey, Antarctica/Davis, Antarctica/DumontDUrville, Antarctica/Macquarie, Antarctica/Mawson, Antarctica/McMurdo, Antarctica/Palmer, Antarctica/Rothera, Antarctica/South_Pole, Antarctica/Syowa, Antarctica/Troll, Antarctica/Vostok, Arctic/Longyearbyen, Asia/Aden, Asia/Almaty, Asia/Amman, Asia/Anadyr, Asia/Aqtau, Asia/Aqtobe, Asia/Ashgabat, Asia/Ashkhabad, Asia/Atyrau, Asia/Baghdad, Asia/Bahrain, Asia/Baku, Asia/Bangkok, Asia/Barnaul, Asia/Beirut, Asia/Bishkek, Asia/Brunei, Asia/Calcutta, Asia/Chita, Asia/Choibalsan, Asia/Chongqing, Asia/Chungking, Asia/Colombo, Asia/Dacca, Asia/Damascus, Asia/Dhaka, Asia/Dili, Asia/Dubai, Asia/Dushanbe, Asia/Famagusta, Asia/Gaza, Asia/Harbin, Asia/Hebron, Asia/Ho_Chi_Minh, Asia/Hong_Kong, Asia/Hovd, Asia/Irkutsk, Asia/Istanbul, Asia/Jakarta, Asia/Jayapura, Asia/Jerusalem, Asia/Kabul, Asia/Kamchatka, Asia/Karachi, Asia/Kashgar, Asia/Kathmandu, Asia/Katmandu, Asia/Khandyga, Asia/Kolkata, Asia/Krasnoyarsk, Asia/Kuala_Lumpur, Asia/Kuching, Asia/Kuwait, Asia/Macao, Asia/Macau, Asia/Magadan, Asia/Makassar, Asia/Manila, Asia/Muscat, Asia/Nicosia, Asia/Novokuznetsk, Asia/Novosibirsk, Asia/Omsk, Asia/Oral, Asia/Phnom_Penh, Asia/Pontianak, Asia/Pyongyang, Asia/Qatar, Asia/Qostanay, Asia/Qyzylorda, Asia/Rangoon, Asia/Riyadh, Asia/Saigon, Asia/Sakhalin, Asia/Samarkand, Asia/Seoul, Asia/Shanghai, Asia/Singapore, Asia/Srednekolymsk, Asia/Taipei, Asia/Tashkent, Asia/Tbilisi, Asia/Tehran, Asia/Tel_Aviv, Asia/Thimbu, Asia/Thimphu, Asia/Tokyo, Asia/Tomsk, Asia/Ujung_Pandang, Asia/Ulaanbaatar, Asia/Ulan_Bator, Asia/Urumqi, Asia/Ust-Nera, Asia/Vientiane, Asia/Vladivostok, Asia/Yakutsk, Asia/Yangon, Asia/Yekaterinburg, Asia/Yerevan, Atlantic/Azores, Atlantic/Bermuda, Atlantic/Canary, Atlantic/Cape_Verde, Atlantic/Faeroe, Atlantic/Faroe, Atlantic/Jan_Mayen, Atlantic/Madeira, Atlantic/Reykjavik, Atlantic/South_Georgia, Atlantic/St_Helena, Atlantic/Stanley, Australia/ACT, Australia/Adelaide, Australia/Brisbane, Australia/Broken_Hill, Australia/Canberra, Australia/Currie, Australia/Darwin, Australia/Eucla, Australia/Hobart, Australia/LHI, Australia/Lindeman, Australia/Lord_Howe, Australia/Melbourne, Australia/NSW, Australia/North, Australia/Perth, Australia/Queensland, Australia/South, Australia/Sydney, Australia/Tasmania, Australia/Victoria, Australia/West, Australia/Yancowinna, Brazil/Acre, Brazil/DeNoronha, Brazil/East, Brazil/West, CET, CST6CDT, Canada/Atlantic, Canada/Central, Canada/Eastern, Canada/Mountain, Canada/Newfoundland, Canada/Pacific, Canada/Saskatchewan, Canada/Yukon, Chile/Continental, Chile/EasterIsland, Cuba, EET, EST5EDT, Egypt, Eire, Etc/GMT, Etc/GMT+0, Etc/GMT+1, Etc/GMT+10, Etc/GMT+11, Etc/GMT+12, Etc/GMT+2, Etc/GMT+3, Etc/GMT+4, Etc/GMT+5, Etc/GMT+6, Etc/GMT+7, Etc/GMT+8, Etc/GMT+9, Etc/GMT-0, Etc/GMT-1, Etc/GMT-10, Etc/GMT-11, Etc/GMT-12, Etc/GMT-13, Etc/GMT-14, Etc/GMT-2, Etc/GMT-3, Etc/GMT-4, Etc/GMT-5, Etc/GMT-6, Etc/GMT-7, Etc/GMT-8, Etc/GMT-9, Etc/GMT0, Etc/Greenwich, Etc/UCT, Etc/UTC, Etc/Universal, Etc/Zulu, Europe/Amsterdam, Europe/Andorra, Europe/Astrakhan, Europe/Athens, Europe/Belfast, Europe/Belgrade, Europe/Berlin, Europe/Bratislava, Europe/Brussels, Europe/Bucharest, Europe/Budapest, Europe/Busingen, Europe/Chisinau, Europe/Copenhagen, Europe/Dublin, Europe/Gibraltar, Europe/Guernsey, Europe/Helsinki, Europe/Isle_of_Man, Europe/Istanbul, Europe/Jersey, Europe/Kaliningrad, Europe/Kiev, Europe/Kirov, Europe/Lisbon, Europe/Ljubljana, Europe/London, Europe/Luxembourg, Europe/Madrid, Europe/Malta, Europe/Mariehamn, Europe/Minsk, Europe/Monaco, Europe/Moscow, Europe/Nicosia, Europe/Oslo, Europe/Paris, Europe/Podgorica, Europe/Prague, Europe/Riga, Europe/Rome, Europe/Samara, Europe/San_Marino, Europe/Sarajevo, Europe/Saratov, Europe/Simferopol, Europe/Skopje, Europe/Sofia, Europe/Stockholm, Europe/Tallinn, Europe/Tirane, Europe/Tiraspol, Europe/Ulyanovsk, Europe/Uzhgorod, Europe/Vaduz, Europe/Vatican, Europe/Vienna, Europe/Vilnius, Europe/Volgograd, Europe/Warsaw, Europe/Zagreb, Europe/Zaporozhye, Europe/Zurich, GB, GB-Eire, GMT, GMT0, Greenwich, Hongkong, Iceland, Indian/Antananarivo, Indian/Chagos, Indian/Christmas, Indian/Cocos, Indian/Comoro, Indian/Kerguelen, Indian/Mahe, Indian/Maldives, Indian/Mauritius, Indian/Mayotte, Indian/Reunion, Iran, Israel, Jamaica, Japan, Kwajalein, Libya, MET, MST7MDT, Mexico/BajaNorte, Mexico/BajaSur, Mexico/General, NZ, NZ-CHAT, Navajo, PRC, PST8PDT, Pacific/Apia, Pacific/Auckland, Pacific/Bougainville, Pacific/Chatham, Pacific/Chuuk, Pacific/Easter, Pacific/Efate, Pacific/Enderbury, Pacific/Fakaofo, Pacific/Fiji, Pacific/Funafuti, Pacific/Galapagos, Pacific/Gambier, Pacific/Guadalcanal, Pacific/Guam, Pacific/Honolulu, Pacific/Johnston, Pacific/Kiritimati, Pacific/Kosrae, Pacific/Kwajalein, Pacific/Majuro, Pacific/Marquesas, Pacific/Midway, Pacific/Nauru, Pacific/Niue, Pacific/Norfolk, Pacific/Noumea, Pacific/Pago_Pago, Pacific/Palau, Pacific/Pitcairn, Pacific/Pohnpei, Pacific/Ponape, Pacific/Port_Moresby, Pacific/Rarotonga, Pacific/Saipan, Pacific/Samoa, Pacific/Tahiti, Pacific/Tarawa, Pacific/Tongatapu, Pacific/Truk, Pacific/Wake, Pacific/Wallis, Pacific/Yap, Poland, Portugal, ROK, Singapore, SystemV/AST4, SystemV/AST4ADT, SystemV/CST6, SystemV/CST6CDT, SystemV/EST5, SystemV/EST5EDT, SystemV/HST10, SystemV/MST7, SystemV/MST7MDT, SystemV/PST8, SystemV/PST8PDT, SystemV/YST9, SystemV/YST9YDT, Turkey, UCT, US/Alaska, US/Aleutian, US/Arizona, US/Central, US/East-Indiana, US/Eastern, US/Hawaii, US/Indiana-Starke, US/Michigan, US/Mountain, US/Pacific, US/Samoa, UTC, Universal, W-SU, WET, Zulu, EST, HST, MST, ACT, AET, AGT, ART, AST, BET, BST, CAT, CNT, CST, CTT, EAT, ECT, IET, IST, JST, MIT, NET, NST, PLT, PNT, PRT, PST, SST, VST]
        System.out.println(Arrays.toString(TimeZone.getAvailableIDs()));
    }
}

4.打包

对该项目进行打包(封装成组件)。

先clean后package。

创建一个名为:marydon-connector-jdbc-1.0的文件夹,并将生成的jar包和lib目录全部拷贝到此文件夹下。

说明:

marydon-connector-jdbc-1.0.Final.jar就是我们创建的Sink Connector插件。

lib目录下的jar包是该插件运行所必须依赖的jar包。

插件名称:就是JdbcSinkConnector所在包名全路径,即:code.marydon.connectors.JdbcSinkConnector。

5.使用

插件已经打包好了,如何使用?

我们将marydon-connector-jdbc-1.0文件夹拷贝到KAFKA_HOME/plugins目录下。

然后按照顺序启动zookeeper,kafka和kafka connect。

创建生产者,在创建此插件的消费者。

6.演示

源库:mysql,目标库:oracle,示范mysql-->oracle数据的新增、删除、修改实时同步功能。

前提:依次启动zookeeper,kafka和kafka connect。

查看可用组件

确定是否存在:code.marydon.connectors.JdbcSinkConnector。

请求地址:

http://localhost:8083/connector-plugins

 

当显示的插件列表含:

{"class":"code.marydon.connectors.JdbcSinkConnector","type":"sink","version":"1.0.Final"}

就说明,我们打包的此插件可用。 

查询可用参数

查看code.marydon.connectors.JdbcSinkConnector组件的可用参数。

请求地址:

http://localhost:8083/connector-plugins/code.marydon.connectors.JdbcSinkConnector/config

只要是调:ConfigDef.define()方法的参数,都会展示在出来,这些供外部调取。

捕获数据

通过io.debezium.connector.mysql.MySqlConnector插件,捕获mysql数据,将其发布到kafka当中。

mysql源表t_patient_zs表结构。

源表数据

清空目标表数据

捕获数据

请求地址:

http://localhost:8083/connectors

请求数据:

{
  "name": "debezium-connector-source-70",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.allowPublicKeyRetrieval": "true",
    "database.user": "用户名",
    "database.server.id": "70",
    "tasks.max": "1",
    "database.server.name": "source-数据库名-70",
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
    "column.include.list": "数据库名.t_patient_zs.CREATE_TIME,数据库名.t_patient_zs.ID,数据库名.t_patient_zs.JZ_TIME,数据库名.t_patient_zs.PATIENT_ID,数据库名.t_patient_zs.UPDATE_TIME,数据库名.t_patient_zs.ZS_ID",
    "database.port": "3306",
    "schema.history.internal.store.only.captured.tables.ddl": "true",
    "include.schema.changes": "true",
    "inconsistent.schema.handling.mode": "warn",
    "topic.prefix": "topic-数据库名-70",
    "schema.history.internal.kafka.topic": "schema-history-数据库名-70",
    "database.hostname": "192.168.0.1",
    "database.password": "密码",
    "table.include.list": "数据库名.t_patient_zs",
    "database.include.list": "数据库名",
    "snapshot.mode": "initial"
}

通过查看主题内容,发现:当前表的3条数据已经放到了kafka当中。 

说明:我这里是通过efak能直接查看到某个主题的数据,你可以通过消费指定主题来查看待同步数据。

同步数据

通过code.marydon.connectors.JdbcSinkConnector插件,从kafka订阅mysql数据,并将其同步到oracle当中。

oracle目标表t_patient_zs表结构。

同步数据

请求地址:

http://localhost:8083/connectors

请求数据:

{
  "name": "marydon-connector-sink-70-t_patient_zs",
  "config": {
    "connector.class": "code.marydon.connectors.JdbcSinkConnector",
    "table.name.format": "T_PATIENT_ZS",
    "connection.password": "密码",
    "tasks.max": "1",
    "topics.regex": "topic-数据库名-70.数据库名.t_patient_zs",
    "delete.enabled": "true",
    "columns.map": "CREATE_TIME:CREATE_TIME,ID:ID,JZ_TIME:JZ_TIME,PATIENT_ID:PATIENT_ID,UPDATE_TIME:UPDATE_TIME,ZS_ID:ZS_ID",
    "auto.evolve": "false",
    "connection.user": "用户名",
    "auto.create": "true",
    "connection.url": "jdbc:oracle:thin:@192.168.0.1:1521/orcl",
    "connection.driveClassName": "oracle.jdbc.driver.OracleDriver",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "columns.date": "CREATE_TIME:DATE,JZ_TIME:TIMESTAMP(6),UPDATE_TIME:DATE"
}

kafka connect控制台会输出inset into语句。

查看Oracle同步结果。

这样一来,数据初始同步就完成啦。

新增示例

在mysql当中,增加一条数据。

kafka connect控制台输出

oracle表新增的数据

 

修改示例

将mysql表id=2的数据进行修改

控制台输出

Oracle

 

删除示例

删除id=4的记录

控制台

Oracle

 

写在最后

  哪位大佬如若发现文章存在纰漏之处或需要补充更多内容,欢迎留言!!!

 相关推荐: