1.前言
上一篇主要介绍了Confluent的基本概念,如果对Confluent不了解的请回看上篇文章。
2.系统架构
为了保证系统可靠性,真实生产环境中都会以集群的方式搭建,以避免单机宕机造成的影响。本文以3台机器,MySQL作为源/目的数据库来进行数据库的转移实验。
整个系统的整体结构如下图所示,因为每个组件都是独立提供服务且都能以集群的方式进行工作,因此本实验把每个服务都分别部署到3台机器来模拟集群环境。本系统主要用了Zookeeper、Kafka、Kafka-Connect、Schema-Registry 4种服务,整体架构如下:
整个系统工作流程是Source Connector集群从源MySQL DB中不断实时读取变动数据(增/删/改)再经过Schema-Registry序列化后插入到Kafka消息队列中,Sink Connector会不断从Kafka消息队列中获取数据再经过反序列化插入到目的MySQL DB中。3.Confluent 安装
本文以CentOS操作系统为实验环境。
3.1 安装JDK1.8
下载JDK1.8 64位,解压到安装目录。
设置java环境变量,在~/.bashrc文件中增加以下信息。
export PATH=/bin:${PATH}; export CLASSPATH=.: /lib/dt.jar: /lib/tools.jar复制代码
执行source~/.bashrc 使之生效。
3.2 Confluent 安装
下载Confluent Community版,下载链接为https://www.confluent.io/download/,解压到安装目录,添加环境变量。
export PATH=/bin:${PATH};export CLASSPATH= /share/java/*:${CLASSPATH}复制代码
执行source~/.bashrc 使之生效。
3.3 安装mysql-connector-jdbc.jar
因为实验数据库为MySQL,因此下载MySQL驱动包。
下载jar包,然后放到/java/kafka/目录下面。
4.服务配置
4.1 Zookeeper 配置
编辑/etc/kafka/zookeeper.properites文件,修改以下配置信息。
完整配置信息可以参看链接:
tickTime=2000 #时间单元,毫秒单位 dataDir=/var/lib/zookeeper/ #数据存储路径 clientPort=2181 #zookeeper 客户端监听端口 initLimit=5 #followers 初始化时间 syncLimit=2 #followers 同步时间 maxClientCnxns=0 #最大client连接数,值为0的时候没有上限 server.< myid >=< hostname >:< leaderport >:< electionport > 集群配置 server.1=< IP1>:2888:3888 #server1 地址,修改为自身地址 server.2=< IP2>:2888:3888 #server2 地址 server.3=< IP3>:2888:3888 #server3 地址 autopurge.snapRetainCount=3 #最近的快照保存数目 autopurge.purgeInterval=24 #快照自动清除时间间隔
修改好配置文件后,需要在每台Zookeeper Server的dataDir目录下创建myid文件来在集群中作为
唯一标示。
例如:
server1: echo 1 > myidserver2: echo 2 > myidserver3: echo 3 > myid复制代码
集群中所有配置信息需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。
4.2 Kafka 配置
编辑/etc/kafka/server.properites文件,修改以下配置信息。
完整配置信息可以参看链接:
#集群地址,修改为自己地址 zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 broker.id=[1,2,3] #节点标识,每台机器依次改为1,2,3….依次递增 log.dirs=/xxx/log/kafka #日志目录,修改为自身log目录 listeners=PLAINTEXT://0.0.0.0:9092 #监听地址 advertised.listeners=PLAINTEXT://0.0.0.0:9092 #发布到zookeeper供客户端连接的地址 num.partitions=3 #分区数设置,分区间数据无序,分区内数据有序。若需要保证有序,此处设置为1 default.replication.factor=3 #消息备份数目默认1不做复制。此处改为2,备份一份。 port=9092 #服务端口,默认9092
集群中除broker.id外所有配置信息需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。
4.3 Schema Registry 配置
编辑/etc/schema-registry/schema-registry.properites 文件,修改以下配置信息。
完整配置信息可以参看链接
listeners=http://0.0.0.0:8081 #监听地址 host.name=172.21.101.186 #主机地址,改为机器本身地址 port #端口号,默认8081 #zookeeper 集群列表,改为机器相应地址
kafkastore.connection.url=xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx:2181
集群中除host.name设置为本身ip外,所有配置信息均需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。
4.4 Kafka-Connect
4.4.1 Connect 配置
编辑/etc/schema-registry/connect-avro-distributed.properties.properites 文件,修改以下配置信息。
完整参数参考链接:
group.id=connect-cluster #Connect集群组标示,集群中此处所有配置必须相同。 kafka节点列表,host1:port1,host2:port2,...修改为相应机器地址 bootstrap.servers=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 key.converter=io.confluent.connect.avro.AvroConverter #使用Avro为key转化类 key.converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #key的schema 访问URL value.converter= io.confluent.connect.avro.AvroConverter #使用Avro为value 转化类 value.Converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #value的schema 访问URL config.storage.replication.factor=3 #config信息备份因子,不超过集群数目 offset.storage.replication.factor=3 #偏移量备份因子,不超过集群数目 status.storage.replication.factor=3 #任务状态备份因子,不超过集群数目
集群中除schema.registry.url设置为本身ip外,集群中所有配置信息均需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。
4.4.2 Connect-JDBC
Kafka Connect主要由两部分组成,分别是Source Connector 和Sink Connector。Source Connector 负责从数据库中把信息读入到Kafka,Sink Connector负责把数据从Kafka 中读到数据库。实验使用的是MySql数据库,因此我们主要介绍Confluent JDBC Connector。
完整参数参考链接:
4.4.2.1 JDBC Source Connector
以下是各个参数说明,可以根据实际业务场景来配置各个参数。
connection.url #数据库连接地址
connection.user #数据库用户名 connection.password #数据库密码 table.whitelist #查询表白名单 connection.attempts #数据库连接次数 numeric.precision.mapping #是否根据进度判断数据类型 table.blacklist #表黑名单 connection.backoff.ms #数据库尝试连接间隔时间 schema.pattern #查询表时使用的schema mode #更新表时候用的模式,主要有以下4种模式. Bulk:每次都全部查询。 timestamp:根据时间戳字段是否变化来检测数据是否增加或更新。 incrementing :用自增长字段来检测数据是否有增加,不能检测数据变化和删除。 timestamp+incrementing :根据时间戳和自增长字段来检查新增更新数据,根据自增长字段来标示唯一的流数据。 incrementing.column.name #用来判断是否有新增数据的自增长字段名称,该字段值不允许为空值。 timestamp.column.name #用来判断数据是否有新增和更新的时间戳字段,该字段值不允许为空值 validate.non.null #设置是否检测数据库中自增长和时间戳字段不允许为空值,如果检查失败connector就停止启动。 query #设置数据查询语句,如果设置了就不进行全表轮询,而是只是用此sql语句去提取数据 query.condition #设置自定义查询条件,会拼接到where语句后面。(非自带,修改代码添加此参数) poll.interval.ms #数据轮询时间间隔,对数据库执行查询语句的时间间隔,此参数对数据库性能有影响 batch.max.rows # connector每次轮询获取数据的最大数,默认100条,和connector获取数据的性能有关 table.poll.interval.ms #检查表是否有增加或删除的时间间隔。 topic.prefix #使用普通查询时候以topic.prefix +表名 作为topic.自定义query语句时候以topic.prefix作为topic,此处写目的库的表名 table.types #设置要查询的表类型,默认为table,还可以设置view、system table。 timestamp.delay.interval.ms #延迟转移时间间隔,可以延迟数据转移
4.4.2.2 JDBC Sink Connector
以下是各个参数说明,根据实际业务场景来配置各个参数。
connection.url #数据库连接URL connection.user #数据库连接用户名 connection.password #数据库密码。 insert.mode # 插入数据模式 insert,upsert,update。本项目使用upsert模式,在没有相应主键数据时候直接插入,否则进行更新操作。 batch.size #每次插入数据的最大数,和数据库性能有关 topics #订阅的主题,也是目的数据库的表名
table.name.format #插入表面格式化,默认为${topic} pk.mode #主键模式。None:不设置主键,kafka:使用kafka坐标作为主键,record_key: 使用record 的key字段作为主键,record_value:使用record 的value 字段作为主键。 pk.fields #主键字段,以逗号分隔。项目中使用目的表的虚拟主键或者唯一约束字段。 fields.whitelist #插入字段白名单,如果为空则所有字段都使用 auto.create #是否自动创建目的表。 auto.evolve #当表结构发生变化时候,是否自动修改目的表 max.retries #失败重试次数 retry.backoff.ms #重试时间间隔
4.4.3 Connector REST API
在集群环境下,Connector参数只能通过RESTful接口进行参数配置,通过RESTful接口可以给任意一个服务器发送配置参数,Connector参数信息会自动转发给集群中的其它机器。使用RESTful接口配置参数后,Connector服务无需重启,即可生效。
下面详细描述Connector RESTful接口的配置参数。
4.3.3.1 RESTful Header
目前 REST接口只支持Json格式参数,因此请求头应该如下设置。
Accept: application/jsonContent-Type: application/json 复制代码
4.3.3.2 RESTful URL
接口 | 功能 |
---|---|
GET /connectors | 获取connectors列表 |
POST /connectors | 创建connector |
GET /connectors/(string:name) | 获取指定connector 信息 |
PUT /connectors/(string:name)/config | 创建或者修改指定connector 配置 |
GET /connectors/(string:name)/status | 获取指定connector 运行状态信息 |
POST /connectors/(string:name)/restart | 重启指定connector |
PUT /connectors/(string:name)/pause | 暂停指定connector |
PUT /connectors/(string:name)/resume | 重启指定connector |
DELETE /connectors/(string:name)/ | 删除指定connector |
GET /connectors/(string:name)/tasks | 获取指定connector的tasks列表信息 |
GET /connectors/(string:name)/tasks/(int:taskid)/status | 获取指定task状态 |
POST /connectors/(string:name)/tasks/(int:taskid)/restart | 重启指定task |
5. 服务启动
5.1 启动Zookeeper
zookeeper-server-start/etc/kafka/zookeeper.properties 复制代码
5.2 启动 Kafka
kafka-server-start/etc/kafka/server.properties复制代码
5.3 启动Schema-registry-start
schema-registry-start/etc/schema-registry/schema-registry.properties复制代码
5.4 启动Connector
connect-distributed/etc/schema-registry/connect-avro-distributed.properties复制代码
6. 启动Connector任务
使用Postman对Connector的REST API 接口进行访问配置。示例如下:
6.1 Header 设置
Headers Tab页面增加以下参数。
6.2 配置Source Connector
发送请求后即可完成任务的配置。6.3 配置Sink Connector
配置完毕点击Send按钮,即完成了Source、Sink Connector任务的配置和启动。此时若向Source 数据库添加或更改数据,目的数据库会实时更新过来。7.服务停止
在集群中的机器中依次关闭以下服务,关闭时候,需要保证关闭顺序的正确性。
7.1 关闭Connect-jdbc
confluent connect stop复制代码
执行命令后若提示 connect is [DOWN],则代表connect服务关闭成功。
7.2 关闭Schema-registry
confluent schema-registry stop复制代码
执行命令后若提示 schema-registry is [DOWN],则代表schema-registry服务关闭成功。
7.3 关闭Kafka
confluent kafka stop复制代码
执行命令后若提示 kafka is [DOWN],则代表upkafka服务关闭成功。
7.4 关闭Zookeeper
confluent zookeeper stop复制代码
执行命令后若提示 zookeeper is [DOWN],则代表zookeeper服务关闭成功。
8. 小结
通过实验所有数据都能实时进行转移并没有遗漏。
在破坏性测试中如杀掉服务进程、下线集群中的某台机器等异常测试中,集群都能正常的进行转移工作。
即使把集群中所有机器的服务都停止,当服务重启后,Confluent会把宕机期间的数据转移过来。可以看出Confluent确实如官方宣传所言,是一个高性能,高可靠的系统。
到此,本文也基本结束,希望此教程能帮助所有需要的人。
想要了解更多,关注公众号:七分熟pizza