0

通过上一篇文章Kafka:MirrorMaker-V1我们已经知道了MirrorMaker-V1的基本概念,这篇文章我们来给Kafka-cluster搭建一个mirror。

环境准备

操作系统: centOs7
java: OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)
zookeeper: apache-zookeeper-3.6.1
kafka: kafka_2.11-2.4.1

kafka集群架构

Mirror创建步骤

MirrorMaker-V1是一个独立的工具,可以在任何能访问到两个Kafka-cluster的机器上启动

启动命令

bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist '.*sync'

分析一下这个命令

--consumer.config 指的是消费者配置文件路径当然这里的消费者指的是MirrorMaker-V1,消费的数据来自于source-cluster。

#config/consumer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster1:9092,kafka-cluster1:9093             # source-cluster的broker list
group.id=test-consumer-group1                                         # 自定义一个消费者的group id
auto.offset.reset=                                                    # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据; earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费; none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

--producer.config 指的是生产者配置文件路径当然这里的消费者代表的也是MirrorMaker-V1,生产的数据目的地是destination-cluster

#config/producer.properties 在网上看到有在此配置zookeeper的应该是之前的老版本。kafka_2.11-2.4.1中不需要
bootstrap.servers=kafka-cluster2:9092,kafka-cluster2:9093             # destination-cluster的broker list
compression.type=none                                                 # 数据压缩方式none, gzip, snappy, lz4, zstd
partitioner.class=                                                    # 指定分区程序路径,默认为随机分区
request.timeout.ms=                                                   # 请求超时时间
max.block.ms=                                                         # `KafkaProducer.send` and `KafkaProducer.partitionsFor` 阻塞时间
linger.ms=                                                            # 等待指定时间后批量发送
max.request.size=                                                     # 发送消息最大字节数
batch.size=                                                           # 单次批量处理的字节数
buffer.memory=                                                        # 指定等待发送消息的缓冲区大小

--whitelist指定的是同步topic的白名单,这是个必输项。可以用Java-style regular expressions按照正则表达式来订阅topics,--whitelist '.*sync'代表的就是订阅所有以sync结尾的topic。

验证MirrorMaker-V1是否成功创建

启动MirrorMaker-V1后,利用kafka-producer-perf-test.sh向Kafka-cluster1中g_sync写入数据。

bin/kafka-producer-perf-test.sh --topic g_sync --num-records 10 --throughput 1 --producer-props bootstrap.servers=kafka-cluster1:9092,kafka-cluster1:9093 --record-size 100

7 records sent, 1.3 records/sec (0.00 MB/sec), 90.1 ms avg latency, 524.0 ms max latency.
10 records sent, 1.044714 records/sec (0.00 MB/sec), 65.80 ms avg latency, 524.00 ms max latency, 12 ms 50th, 524 ms 95th, 524 ms 99th, 524 ms 99.9th.

数据写入完成后在Kafka-cluster2中消费数据,发现数据已成功同步

bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster2:9092,kafka-cluster2:9093 --topic g_sync --from-beginning

SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL
SSXVNJHPDQDXVCRASTVYBCWVMGNYKRXVZXKGXTSPSJDGYLUEGQFLAQLOCFLJBEPOWFNSOMYARHAOPUFOJHHDXEHXJBHWGSMZJGNL

注意:经过多次验证发现,当kafka-cluster1中创建topic后需要重启MirrorMaker,才能在kafka-cluster2中自动创建topic并完成数据同步。这个问题cosmozhu还没有找到解决方法,有解决了这个问题的同学,可以留言给我。