1+

回顾MM1

上篇文章中我们介绍了MirrorMaker-V1(MM1),本质上MM1是Kafka的消费者和生产者结合体,可以有效地将数据从源群集移动到目标群集,但没有提供太多其他功能。

并且在MM1多年的使用过程中发现了以下局限性:

  1. 静态的黑名单和白名单
  2. topic信息不能同步
  3. 必须通过手动配置来解决active-active场景下的循环同步问题
  4. rebalance导致的性能问题
  5. 缺乏监控手段
  6. 无法保证Exactly Once
  7. 无法提供容灾恢复

第三方解决方案

针对这些无法无法满足企业需要的点,多个企业提供了自己的解决方案,例如:

  1. Linkedin的Brooklin
  2. Salesforce的Mirus
  3. Uber的uReplicator
  4. Confluent的Confluent Replicator(收费哦)

kafka带来的MM2

kafka开源社区也终于在kafka2.4带来了自己的企业级解决方案MirrorMaker-V2(MM2)。MM2修复了MM1所存在的局限性。

MM2是基于kafka connect框架开发的。与其它的kafka connecet一样MM2有source connector和sink connetor组成,不熟悉kafka connect概念的同学可以这样认为,source connector就是MM1中的消费者,它负责读取远端数据中心的数据。sink connetor是生产者,他负责将拉回来的数据写入本地的数据中心。

MM2共含有4种类型的connector:

  1. MirrorSourceConnector
  2. MirrorSinkConnector
  3. MirrorCheckpointConnector
  4. MirrorHeartbeatConnector

Screen

与MM1不同的是,MM2的source和sink两个connector包含了,源数据的消费者,远端数据的生产者,和一对AdminClient用来同步topic配置信息。

它的部署方式跟MM1相同,都是部署在目标集群方。

虽然官方提供了4中部署方式:

  1. 专用MirrorMaker集群运行
  2. 单机MirrorMaker运行
  3. 在connect cluster上运行
  4. 以MM1方式运行

本来cosmozhu准备使用第三中方式运行MM2集群,因为使用connect cluster运行后可以使用kafka connect restful api 来管理task。但是在实际操作过程中发现这部分还没有开发完成。

MM2-1

这里cosmozhu推荐使用第一种方式,第二种只能用于测试,第四种方式是为了兼容MM1的老用户。

MM2的启动脚本是connect-mirror-maker.sh,从名称上来看connect开头,很明显这块是纳入到了kafka-connect框架。它的启动配置文件为config/connect-mirror-maker.properties

MM2配置详解

以最新版本kafka2.5为例。

#定义集群别名
clusters = A, B
A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092 # 设置A集群的kafka地址列表
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092 # 设置B集群的kafka地址列表
A->B.enabled = true # 开启A集群向B集群同步
A->B.topics = .* # 允许同步topic的正则

B->A.enabled = true # 开启B集群向A集群同步
B->A.topics = .* # 允许同步topic的正则

#MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

#自定义参数
sync.topic.configs.enabled=true #是否同步源topic配置信息
sync.topic.acls.enabled=true #是否同步源ACL信息
emit.heartbeats.enabled=true #连接器是否发送心跳
emit.heartbeats.interval.seconds=5 #心跳间隔
emit.checkpoints.enabled=true #是否发送检查点
refresh.topics.enabled=true #是否刷新topic列表
refresh.topics.interval.seconds=5 #刷新间隔
refresh.groups.enabled=true #是否刷新消费者组id
refresh.groups.interval.seconds=5 #刷新间隔
readahead.queue.capacity=500 #连接器消费者预读队列大小
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy #使用LegacyReplicationPolicy模仿MM1
heartbeats.topic.retention.ms=1 day #首次创建心跳主题时,设置心跳数据保留时长
checkpoints.topic.retention.ms=1 day #首次创建检查点主题时,设置检查点数据保留时长
offset.syncs.topic.retention.ms=max long #首次创建偏移量主题时,设置偏移量数据保留时长
replication.factor=2 #远端创建新topic的replication数量设置

MM2启动命令

bin/connect-mirror-maker.sh config/connect-mirror-maker.properties

参考资料:

  1. https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
  2. https://blog.cloudera.com/kafka-replication-the-case-for-mirrormaker-2-0/
  3. https://blog.cloudera.com/a-look-inside-kafka-mirrormaker-2/

作者:cosmozhu --90后的老父亲,专注于保护地球的程序员
个人网站:https://www.cosmozhu.fun
欢迎转载,转载时请注明出处。