目 录CONTENT

文章目录

Canal 同步Mysql 到 ES

小张的探险日记
2021-09-15 / 0 评论 / 0 点赞 / 512 阅读 / 7,431 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2021-12-16,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

img

1.简介

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

2.工作原理

Mysql 的主从复制原理

img

3.环境配置

​ 基于 本地 windows 。 使用相关组件如下

image-20210903181603289

#### 3.1 Mysql

此案例使用自建-Mysql 版本: 5.7

​ 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

修改配置文件如下后,重启服务。

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步

# 查看是否打开binlog 模式
show VARIABLES like 'log_bin'
# 查看binlog 日志文件列表
show BINARY logs
# 正在写入的binlog 文件
show master status
# 验证格式为ROW
# binlog_format 有三种:ROW,STATEMENT,MIXID
show variables like 'binlog_format'

image-20210903180952481

image-20210903180932940

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

创建一个用户 并授权

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3.2 ES

ES:7.12.1

​ 下载地址:https://www.elastic.co/cn/downloads/past-releases#elasticsearch

配套下 kibana 方便视图查看数据。

image-20210903181434867

下载启动后直接使用 bin 目录下的 启动命令即可。

3.3 canal

​ canal 下载 : https://github.com/alibaba/canal/releases

整体模块:

1. canal.deployer - 模拟 mysql:slave 获取实时数据 - 此案例用到
2. canal.adapter - 用于 中间件之间的数据实时同步 - 此案例用到
3. canal.admin - 提供管理操作界面
4. canal.example - 提供获取canal.deployer数据功能,用于用户自定义处理数据。

不建议使用 v1.1.5 有Bug,同步不了ES,也不报错,又没效果

Issues : https://github.com/alibaba/canal/issues/3601

建议使用如下版本

image-20210903173735914

3.3.1 canal.deployer 启动

​ 修改 canal.deployer-1.1.5-SNAPSHOT\conf\example 下的 instance.properties 文件,配置当前实例的信息。

具体的看 下面的注释即可。

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
#tips:binlog和偏移量也可以不指定,则canal-server会从当前的位置开始读取。我建议不设置

#指定要读取binlog的MySQL的IP地址和端口
canal.instance.master.address=127.0.0.1:3306
#从指定的binlog文件开始读取数据
canal.instance.master.journal.name=
#指定偏移量,做过主从复制的应该都理解这两个参数。
canal.instance.master.position=
#mysql主库链接时起始的binlog的时间戳,默认:无
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#这几个参数是设置高可用配置的,可以配置mysql从库的信息
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
#指定连接mysql的用户密码,前序准备Mysql 环境时 创建的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
# mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
# 常见例子:
# 1. 所有表:.* or .*\\..*
# 2. canal schema下所有表: canal\\..*
# 3. canal下的以canal打头的表:canal\\.canal.*
# 4. canal schema下的一张表:canal.test1
# 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
#这个是比较重要的参数,匹配库表白名单,比如我只要test库的user表的增量数据,则这样写 test.user
canal.instance.filter.regex=.*\\..*


# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

运行:双击 canal.deployer-1.1.5-SNAPSHOT\bin\startup.bat

标识运行成功。

image-20210903183005520

3.3.2 canal.adapter 启动

​ 这一步启动完成即可 完成 Mysql 往 ES 实时同步,所以在这之前 需要先 开启 ES 和 Kibana。

image-20210903183805393

image-20210903183815864

同步什么数据到 ES 呢? 我们先准备一下测试数据:

Mysql : 在 test 库下 建表

CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(200) NOT NULL,
  `address` varchar(1000) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Es : 新建 test 索引

在 kibana 中

put test
{
  "mappings": {
      "properties": {
        "address": {
          "type": "text"
        },
        "name": {
          "type": "text"
        }
    }
  }
}

image-20210903184240679

ok 以上数据结构准备完成。

开始配置canal.dadpter

​ 配置 canal.adapter-1.1.5-SNAPSHOT\conf\application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer-
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:
  # 关注这里 需要 配置 数据来源的Mysql 信息 和 数据传输目的地:Es 的信息	
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9200
        properties:
          mode: rest
          cluster.name: elasticsearch
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

配置 canal.adapter-1.1.5-SNAPSHOT\conf\es7\ 目录下 新建文件 文件名.yml, 名称自定义建议与 es 的 index 名称一致。

因为 新建的 es index 名称为test,所以这里命名为 test.yml

dataSourceKey: defaultDS #来源于 上个文件配置的mysql数据源
destination: example # 实例的名称
groupId: g1 # 分组
esMapping:
  _index: test # 索引名称
  _id: _id
  # 查询 同步到 es 的数据
  sql: "select a.id as _id,a.name,a.address from test a"
  #etlCondition: "where t.c_time>={}"
  commitBatch: 3000

ok,配置完成,启动adapter 来测试。

canal.adapter-1.1.5-SNAPSHOT\bin\startup.bat 命令启动

​ 启动成功

image-20210903191831218

4. 测试

默认情况没有数据

image-20210903193016834

image-20210903193201142

插入数据
INSERT INTO `test`.`test`(`id`, `name`, `address`) VALUES (1, '1', '1');

image-20210903193447452

修改数据

​ 几乎实时完成数据同步

update test set name='修改完成的数据' where id=1

image-20210903193608634

删除数据
DELETE from test where id=1
0

评论区