Posted by 西维蜀黍 on 2022-11-13, Last Modified on 2023-05-02


Create a Topic

$ kafka-topics --bootstrap-server --topic demo_topic --create --partitions 3 --replication-factor 1

Here we create a new topic — demo_topic — which includes 3 partitions and is not replicated(we have only 1 node for now).


$ /usr/local/opt/kafka/bin/kafka-topics --zookeeper --topic demo_topic --create --partitions 3 --replication-factor 1
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
	at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
	at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
	at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
	at joptsimple.OptionParser.parse(OptionParser.java:396)
	at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:517)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)

List All Topics

$ kafka-topics --bootstrap-server --list	

Show Info of a Specified Topic

$ kafka-topics --bootstrap-server --topic demo_topic --describe
Topic: demo_topic	TopicId: 9CXy4Q1tRXi8M4puvBs_rg	PartitionCount: 3	ReplicationFactor: 1Configs: segment.bytes=1073741824
	Topic: demo_topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: demo_topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: demo_topic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0


Produce Events

A Kafka client communicates with the Kafka brokers via the network for writing (or reading) events. Once received, the brokers will store the events in a durable and fault-tolerant manner for as long as you need—even forever.

In a separate tab let’s create a producer to the current topic

$ kafka-console-producer --broker-list --topic demo_topic --producer-property acks=all

# Indicate keys
$ echo 'my_key:{"a":1,"b":2}' | kafka-console-producer --broker-list --topic sw_test --property "parse.key=true" --property "key.separator=:"

Here we specify brokers IPs — we have only one listening on localhost:9092, the selected topic and property which will wait for acknowledges from all replicas(only 1 in our case).

Also, let’s create a couple of messages without keys.

Consume Events

Now let’s check our demo_topic and start a consumer.

# from the beginning
$ kafka-console-consumer --bootstrap-server --topic demo_topic --from-beginning

$ kafka-console-consumer --bootstrap-server --topic demo_topic 

# other param
# - specify consumer group: --consumer-property group.id=test1 
  • Immediately we receive two previous messages — this happens because of the flag –from-beginning.

Now try to send new messages from the producer, the consumer will receive them immediately (immediately means 10ms and more).

$ /usr/local/opt/kafka/bin/kafka-console-producer --broker-list --topic demo_topic --producer-property acks=all
$ /usr/local/opt/kafka/bin/kafka-console-consumer --bootstrap-server --topic demo_topic --from-beginning

Consumer Group

  • --consumer-property group.id=test1 to indicate consumer groups
$ kafka-console-producer --broker-list --topic demo_topic --producer-property acks=all
$ kafka-console-consumer --bootstrap-server --topic demo_topic --consumer-property group.id=test1 
$ kafka-console-consumer --bootstrap-server --topic demo_topic --consumer-property group.id=test1 


List Info

# List consumer info of all topics
$ kafka-consumer-groups --describe --bootstrap-server --all-groups
test1           demo_topic2     0          2               3               1               -               -               -

# List consumer info by a group name
$ kafka-consumer-groups --describe --bootstrap-server --group group
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
group           demo_topic3     0          7               7               0               console-consumer-48192653-737e-49fa-9392-0c4a8165f1d0 /   console-consumer

Reset Offset

$ kafka-consumer-groups --bootstrap-server --group console-consumer-53272 --reset-offsets --topic demo_topic3 --to-offset 4 --execute
  • Topic的作用域

    • –-all-topics:为所有topic的所有分区调整位移
    • -–topic t1 --topic t2:为指定的topic的所有分区调整位移
    • –-topic t1:0,1,2:为指定的topic的指定分区调整位移
  • Scenarios

    • –to-earliest:把位移调整到分区当前最小位移
    • –to-latest:把位移调整到分区当前最新位移
    • –to-current:把位移调整到分区当前位移
    • –to-offset <offset>: 把位移调整到指定位移处
    • –shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
    • –to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
    • –by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
    • –from-file <file>:从CSV文件中读取调整策略
  • Execution option

    • 什么参数都不加:只是打印出位移调整方案,不具体执行
      • --dry-run --reset-offsets: Default option. Displays which offsets will be reset if the process is executed.
    • -–execute:执行真正的位移调整
    • --export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用


# 更新到当前group最初的offset位置
kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute

# 更新到指定的offset位置
kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute

# 更新到当前offset位置(解决offset的异常)
kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute

# offset位置按设置的值进行位移
kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute

# offset设置到指定时刻开始
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000