flink 的安装以及fink-cdc 基于多数据源导入的es 的简单使用

发布时间 2023-12-27 12:08:38作者: 2022——new_start

此文档是参照flink-cdc 文档( https://ververica.github.io/flink-cdc-connectors/master/content/快速上手/mysql-postgres-tutorial-zh.html) 案例

 的最佳实践

1.下载flink release 最新版本1.18.0 并解压,

 https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar 下载es flink-cdc 驱动包

 

git clone github上面  flink-cdc master 分支并编译

 

mvn clean install -DskipTests 执行命令进行编译会生成jar 包如下

 

 

 复制jar 包到flink lib 目录

 

启动flink 主程序  bin/start-cluster.sh 

 

访问 http://localhost:8081/#/job-manager/logs  flink webui 界面 

0

需要注意的一点是 确保加入jar包到lib 或者修改配置时,确保stop-cluster 停止成功, 之前碰到执行了stop-cluster.sh  web ui 8080 还能访问的情况 

按照文档编写docker-compose 文件

version: '2.1'
services:
  postgres:
    image: debezium/example-postgres:1.1
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mysql:
    image: debezium/example-mysql:1.1
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
  elasticsearch:
    image: elastic/elasticsearch:7.6.0
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ports:
      - "9200:9200"
      - "9300:9300"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
  kibana:
    image: elastic/kibana:7.6.0
    ports:
      - "5601:5601"

  启动 docker 容器 

    docker-compose  up  -d

   查看启动情况

 说明启动成功

准备mysql 数据

进入MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

 

  1. 创建数据库和表 productsorders,并插入数据(参照原文档案例)

    -- MySQL
    CREATE DATABASE mydb;
    USE mydb;
    CREATE TABLE products (
      id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      name VARCHAR(255) NOT NULL,
      description VARCHAR(512)
    );
    ALTER TABLE products AUTO_INCREMENT = 101;
    
    INSERT INTO products
    VALUES (default,"scooter","Small 2-wheel scooter"),
           (default,"car battery","12V car battery"),
           (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
           (default,"hammer","12oz carpenter's hammer"),
           (default,"hammer","14oz carpenter's hammer"),
           (default,"hammer","16oz carpenter's hammer"),
           (default,"rocks","box of assorted rocks"),
           (default,"jacket","water resistent black wind breaker"),
           (default,"spare tire","24 inch spare tire");
    
    CREATE TABLE orders (
      order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      order_date DATETIME NOT NULL,
      customer_name VARCHAR(255) NOT NULL,
      price DECIMAL(10, 5) NOT NULL,
      product_id INTEGER NOT NULL,
      order_status BOOLEAN NOT NULL -- Whether order has been placed
    ) AUTO_INCREMENT = 10001;
    
    INSERT INTO orders
    VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
           (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
           (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

    在 Postgres 数据库中准备数据

    1. 进入 Postgres 容器

      docker-compose exec postgres psql -h localhost -U postgres

       

      1. 创建表 shipments,并插入数据

        -- PG
        CREATE TABLE shipments (
          shipment_id SERIAL NOT NULL PRIMARY KEY,
          order_id SERIAL NOT NULL,
          origin VARCHAR(255) NOT NULL,
          destination VARCHAR(255) NOT NULL,
          is_arrived BOOLEAN NOT NULL
        );
        ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;
        ALTER TABLE public.shipments REPLICA IDENTITY FULL;
        INSERT INTO shipments
        VALUES (default,10001,'Beijing','Shanghai',false),
               (default,10002,'Hangzhou','Shanghai',false),
               (default,10003,'Shanghai','Hangzhou',false);
       


    cd 到flink 主目录

     启动flink-sql-client

     

     

    在 Flink SQL CLI 中使用 Flink DDL 创建表

    首先,开启 checkpoint,每隔3秒做一次 checkpoint

    -- Flink SQL                   
    Flink SQL> SET execution.checkpointing.interval = 3s;
    

    然后, 对于数据库中的表 products, orders, shipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

    -- Flink SQL
    Flink SQL> CREATE TABLE products (
        id INT,
        name STRING,
        description STRING,
        PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'localhost',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'mydb',
        'table-name' = 'products'
      );
    
    Flink SQL> CREATE TABLE orders (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY (order_id) NOT ENFORCED
     ) WITH (
       'connector' = 'mysql-cdc',
       'hostname' = 'localhost',
       'port' = '3306',
       'username' = 'root',
       'password' = '123456',
       'database-name' = 'mydb',
       'table-name' = 'orders'
     );
    
    Flink SQL> CREATE TABLE shipments (
       shipment_id INT,
       order_id INT,
       origin STRING,
       destination STRING,
       is_arrived BOOLEAN,
       PRIMARY KEY (shipment_id) NOT ENFORCED
     ) WITH (
       'connector' = 'postgres-cdc',
       'hostname' = 'localhost',
       'port' = '5432',
       'username' = 'postgres',
       'password' = 'postgres',
       'database-name' = 'postgres',
       'schema-name' = 'public',
       'table-name' = 'shipments',
       'slot.name' = 'flink'
     );
    

    最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中

    -- Flink SQL
    Flink SQL> CREATE TABLE enriched_orders (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       product_name STRING,
       product_description STRING,
       shipment_id INT,
       origin STRING,
       destination STRING,
       is_arrived BOOLEAN,
       PRIMARY KEY (order_id) NOT ENFORCED
     ) WITH (
         'connector' = 'elasticsearch-7',
         'hosts' = 'http://localhost:9200',
         'index' = 'enriched_orders'
     );
    

    关联订单数据并且将其写入 Elasticsearch 中

    使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中

    -- Flink SQL
    Flink SQL> INSERT INTO enriched_orders
     SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
     FROM orders AS o
     LEFT JOIN products AS p ON o.product_id = p.id
     LEFT JOIN shipments AS s ON o.order_id = s.order_id;



    这里执行
    INSERT INTO enriched_orders
     SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
     FROM orders AS o
     LEFT JOIN products AS p ON o.product_id = p.id
     LEFT JOIN shipments AS s ON o.order_id = s.order_id; 时碰到的问题,

     

    任务提交成功, web ui 查询报错信息, 一直报时区不正确, 进入到mysql SET GLOBAL time_zone ='Asia/Shanghai';设置时区

     再次执行 上面insert sql 报什么资源不可用

    https://www.cnblogs.com/javasl/p/16861356.html

     

     重启flink  bin/start-cluster.sh

    启动 bin/sql-client.sh

    再次执行 

    INSERT INTO enriched_orders
     SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
     FROM orders AS o
     LEFT JOIN products AS p ON o.product_id = p.id
     LEFT JOIN shipments AS s ON o.order_id = s.order_id;



     

    再次看web ui  界面 没有报错信息了,说明job run 成功  此job 会处于一直run 状态

     

     

    在mysql 和 psql 客户端 分别执行sql 看到数据同时同步到了es(实现原理是基于监听数据库binglog日志改动,执行日志重放 )