如何在 Pulsar 中使用 Debezium Connector

作者:Jia Zhai

翻译:Jennifer

编辑:Anonymitaet


本文于 2019 年 5 月 23 日发表在  Debezium 社区。


Bot,向诸位爱卿介绍一下朕的新晋宠妃 Debezium connector 。


Pulsar IO 框架运行 Debezium connector 能捕获数据变化,并能将不同数据库的数据变化保存至 Pulsar。


本文介绍了如何在 Pulsar 中使用 Debezium connector 捕获 MySQL 表的数据变化,并将这些变化保存至 Pulsar。


如何在 Pulsar 中使用 Debezium Connector 


Debezium(https://debezium.io/)是捕获数据变化(Change Data Capture)的一个开源项目。


Debezium 基于 Apache Kafka Connect (https://kafka.apache.org/documentation/#connectapi)开发,支持多种数据库,例如, MySQL、 MongoDB、PostgreSQL、Oracle 以及 SQL Server。


Apache Pulsar(http://pulsar.apache.org)包含一套基于 Pulsar IO 框架的内置 connector (https://pulsar.apache.org/docs/en/io-connectors),它与 Apache Kafka Connect 功能相同。


从 2.3.0 版本开始,Pulsar IO 支持开箱即用的 Debezium 源连接器(http://pulsar.apache.org/docs/en/2.3.0/io-cdc-debezium),你可以利用 Debezium 将数据库中的变化瞬时保存至 Apache Pulsar。


教程步骤预览


除了事件流的存储从 Kafka 转到 Pulsar,其他步骤类似于 Debezium 教程(https://debezium.io/docs/tutorial)。


本教程主要包含六个步骤:


  1. 启动 MySQL。


  2. 启动 standalone Pulsar。


  3. 在 Pulsar IO 中启动 Debezium connector。Pulsar IO 从 MySQL 读取数据变化。


  4. 订阅 Pulsar topic,监控 MySQL 数据变化。


  5. 更改 MySQL 表的数据 ,验证更改是否立刻记录在 Pulsar topic 中。


  6. 清除环境。


1. 启动 MySQL


启动 MySQL, Debezium 会从 MySQL 捕获数据变化。

在新的终端窗口中启动容器,运行 MySQL。

MySQL 有一个预配置好的数据库,名为 inventory。

docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8


结果如下所示:

2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events

2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.

Version: '5.7.25-log'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)


2. 启动 Pulsar


在本地用 standalone 模式启动 Pulsar。 Pulsar 2.3.0 版本新增了在 Pulsar IO 中运行 Debezium connector 的功能。


下载以下文件:

  • Apache Pulsar 2.3.0 二进制包,点击 https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz。

  • Kafka Adapter Connector,点击 (https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar。


在 Pulsar 中,所有 Pulsar IO connector 都独立封装为  NAR(https://medium.com/hashmapinc/nifi-nar-files-explained-14113f7796fd)文件。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar

$ tar zxf apache-pulsar-2.3.0-bin.tar.gz

$ cd apache-pulsar-2.3.0

$ mkdir connectors

$ cp ../pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors

$ bin/pulsar standalone



3. 启动 Debezium Connector


在另一个终端窗口,用 local run 模式启动 Debezium MySQL connector。


debezium-mysql-source-config.yaml 文件包含所有配置,主要参数列在 configs 节点下。yaml 格式的文件有 task.class 参数。配置文件还包含 MySQL 相关参数(例如,服务器、端口、用户、密码)以及 history 和 offset 存储文件中 Pulsar topics 的两个名字。


以下是 debezium-mysql-source-config.yaml 文件的内容:


表会自动创建在 MySQL 中,Debezium connector 一开始就会从 MySQL binlog 文件读取历史记录。输出结果显示 connector 被触发后,处理了 47 条记录。


更多关于如何管理 connector 的信息,查阅 Pulsar IO 文档(http://pulsar.apache.org/docs/en/io-managing/)。


Debezium 捕获和读取的记录会自动发布至 Pulsar topic。在另一终端窗口运行以下命令,将显示 Pulsar 当前的 topic。

$ bin/pulsar-admin topics list public/default


每个表格的变化数据会单独存储在 Pulsar topic 中。除了与数据库表相关的 topic,另外两个名为 history-topic 和 offset-topic 的 topic 分别储存与历史和偏移量相关的数据。

persistent://public/default/history-topic

persistent://public/default/offset-topic


4. 订阅 Pulsar topic


persistent://public/default/dbserver1.inventory.products topic 为例。使用 CLI 命令消费该 topic,并监控 products 表的变化。

$ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0


输出结果如下:

22:17:41.201 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/127.0.0.1:55585 - R:localhost/127.0.0.1:6650]

22:17:41.223 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0


也可以使用 offset topic 监控偏移量变化,表的变化存储在 

persistent://public/default/dbserver1.inventory.products topic 中。

$ bin/pulsar-client consume -s "sub-offset" offset-topic -n 0  


5. 验证 Pulsar topic 


在 Docker 中启动 MySQL, 更改 MySQL 的 product 表的数据。

$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'


运行命令后,出现 MySQL CLI,重命名 products 表中的两个项目。

mysql> use inventory;

mysql> show tables;

mysql> SELECT * FROM  products ;

mysql> UPDATE products SET name='1111111111' WHERE id=101;

mysql> UPDATE products SET name='1111111111' WHERE id=107;


此时,消费 products topic 的终端页面显示已添加了两个变化。


此时,消费 offset topic 的终端页面显示已添加了两个偏移量。


此时, local-run 连接器的终端显示已处理了两条记录。


6. 清除环境


使用 Ctrl+C 关闭终端。


使用 docker ps 和 docker kill 命令关闭与 MySQL 相关的容器。

mysql> quit


$ docker ps

CONTAINER ID  IMAGE COMMAND CREATED STATUS PORTS  NAMES

84d66c2f591d  debezium/example-mysql:0.8 "docker-entrypoint.s…"   About an hour ago   Up About an hour    0.0.0.0:3306->3306/tcp, 33060/tcp mysql


$ docker kill 84d66c2f591d


如需删除 Pulsar 数据,删除 Pulsar binary 目录中的数据目录。

$ pwd

/Users/jia/ws/releases/apache-pulsar-2.3.0


$ rm -rf data


总结


Pulsar IO 框架运行 Debezium connector 能捕获数据变化,并能将不同数据库的数据变化保存至 Pulsar。


本文介绍了如何在 Pulsar 中使用Debezium connector 捕获 MySQL 表的数据变化,并将这些变化保存至 Pulsar。


我们会持续改进在 Pulsar 中使用 Debezium 连接器的体验。在 Pulsar 2.4.0 版本后,操作会变得更加简单。


作者介绍


翟佳是 StreamNative 的核心工程师、开源项目 Apache Pulsar 和 Apache BookKeeper 的 PMC成员,并持续为这两个开源项目作出了杰出贡献。




点击阅读原文,查看英语原文。


评论