Topic
Create a Topic
$ kafka-topics --bootstrap-server 192.168.18.129:9092 --topic demo_topic --create --partitions 3 --replication-factor 1
Here we create a new topic — demo_topic — which includes 3 partitions and is not replicated(we have only 1 node for now).
Debug
$ /usr/local/opt/kafka/bin/kafka-topics --zookeeper 127.0.0.1:2181 --topic demo_topic --create --partitions 3 --replication-factor 1
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:517)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
List All Topics
$ kafka-topics --bootstrap-server 192.168.18.129:9092 --list
Show Info of a Specified Topic
$ kafka-topics --bootstrap-server 192.168.18.129:9092 --topic demo_topic --describe
Topic: demo_topic TopicId: 9CXy4Q1tRXi8M4puvBs_rg PartitionCount: 3 ReplicationFactor: 1Configs: segment.bytes=1073741824
Topic: demo_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: demo_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: demo_topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Produce/Consume
Produce Events
A Kafka client communicates with the Kafka brokers via the network for writing (or reading) events. Once received, the brokers will store the events in a durable and fault-tolerant manner for as long as you need—even forever.
In a separate tab let’s create a producer to the current topic
$ kafka-console-producer --broker-list 192.168.18.129:9092 --topic demo_topic --producer-property acks=all
# Indicate keys
$ echo 'my_key:{"a":1,"b":2}' | kafka-console-producer --broker-list 192.168.18.129:9092 --topic sw_test --property "parse.key=true" --property "key.separator=:"
Here we specify brokers IPs — we have only one listening on localhost:9092, the selected topic and property which will wait for acknowledges from all replicas(only 1 in our case).
Also, let’s create a couple of messages without keys.
Consume Events
Now let’s check our demo_topic and start a consumer.
# from the beginning
$ kafka-console-consumer --bootstrap-server 192.168.18.129:9092 --topic demo_topic --from-beginning
$ kafka-console-consumer --bootstrap-server 192.168.18.129:9092 --topic demo_topic
# other param
# - specify consumer group: --consumer-property group.id=test1
- Immediately we receive two previous messages — this happens because of the flag –from-beginning.
Now try to send new messages from the producer, the consumer will receive them immediately (immediately means 10ms and more).
$ /usr/local/opt/kafka/bin/kafka-console-producer --broker-list 192.168.18.129:9092 --topic demo_topic --producer-property acks=all
$ /usr/local/opt/kafka/bin/kafka-console-consumer --bootstrap-server 192.168.18.129:9092 --topic demo_topic --from-beginning
Consumer Group
--consumer-property group.id=test1
to indicate consumer groups
$ kafka-console-producer --broker-list 192.168.18.129:9092 --topic demo_topic --producer-property acks=all
>aa
>bb
>cc
>dd
$ kafka-console-consumer --bootstrap-server 192.168.18.129:9092 --topic demo_topic --consumer-property group.id=test1
aa
bb
cc
$ kafka-console-consumer --bootstrap-server 192.168.18.129:9092 --topic demo_topic --consumer-property group.id=test1
dd
Consumption
List Info
# List consumer info of all topics
$ kafka-consumer-groups --describe --bootstrap-server 192.168.18.129:9092 --all-groups
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test1 demo_topic2 0 2 3 1 - - -
# List consumer info by a group name
$ kafka-consumer-groups --describe --bootstrap-server 192.168.18.129:9092 --group group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
group demo_topic3 0 7 7 0 console-consumer-48192653-737e-49fa-9392-0c4a8165f1d0 /192.168.18.4 console-consumer
Reset Offset
$ kafka-consumer-groups --bootstrap-server 192.168.18.129:9092 --group console-consumer-53272 --reset-offsets --topic demo_topic3 --to-offset 4 --execute
-
Topic的作用域
–-all-topics
:为所有topic的所有分区调整位移-–topic t1 --topic t2
:为指定的topic的所有分区调整位移–-topic t1:0,1,2
:为指定的topic的指定分区调整位移
-
Scenarios
–to-earliest
:把位移调整到分区当前最小位移–to-latest
:把位移调整到分区当前最新位移–to-current
:把位移调整到分区当前位移–to-offset <offset>
: 把位移调整到指定位移处–shift-by N
: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动–to-datetime <datetime>
:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000–by-duration <duration>
:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S–from-file <file>
:从CSV文件中读取调整策略
-
Execution option
- 什么参数都不加:只是打印出位移调整方案,不具体执行
--dry-run --reset-offsets
: Default option. Displays which offsets will be reset if the process is executed.
-–execute
:执行真正的位移调整--export
:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用
- 什么参数都不加:只是打印出位移调整方案,不具体执行
比如
# 更新到当前group最初的offset位置
kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
# 更新到指定的offset位置
kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
# 更新到当前offset位置(解决offset的异常)
kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
# offset位置按设置的值进行位移
kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
# offset设置到指定时刻开始
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000