Install
macOS
$ brew install kafka
# install zookeeper
Ubuntu
Ref to https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04
Install Java
$ sudo apt install openjdk-11-jre-headless
Creating a User for Kafka
Logged in as your non-root sudo user, create a user called kafka
:
$ sudo adduser kafka
Follow the prompts to set a password and create the kafka
user.
Next, add the kafka
user to the sudo
group with the adduser
command. You need these privileges to install Kafka’s dependencies:
sudo adduser kafka sudo
Your kafka
user is now ready. Log into the account using su
:
su -l kafka
Now that you’ve created a Kafka-specific user, you are ready to download and extract the Kafka binaries.
Download
Use curl
to download the Kafka binaries:
curl -LO https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
Create a directory called kafka
and change to this directory. This will be the base directory of the Kafka installation:
mkdir ~/kafka && cd ~/kafka
Extract the archive you downloaded using the tar
command:
tar -xvzf kafka_2.12-2.8.1.tgz --strip 1
We specify the --strip 1
flag to ensure that the archive’s contents are extracted in ~/kafka/
itself and not in another directory (such as ~/kafka/kafka_2.13-2.6.0/
) inside of it.
Now that we’ve downloaded and extracted the binaries successfully, we can start configuring our Kafka server.
Configuring the Kafka Server
vim ~/kafka/config/server.properties
hange the directory where the Kafka logs are stored by modifying the logs.dir
property:
log.dirs=/home/kafka/kafka/logs
Save and close the file. Now that you’ve configured Kafka, your next step is to create systemd unit files for running and enabling the Kafka server on startup.
Creating Systemd Unit Files and Starting the Kafka Server
Create the unit file for zookeeper
:
sudo vim /etc/systemd/system/zookeeper.service
Enter the following unit definition into the file:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/sw/kafka/bin/zookeeper-server-start.sh /home/sw/kafka/config/zookeeper.properties
ExecStop=/home/sw/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
The [Unit]
section specifies that Zookeeper requires networking and the filesystem to be ready before it can start.
The [Service]
section specifies that systemd should use the zookeeper-server-start.sh
and zookeeper-server-stop.sh
shell files for starting and stopping the service. It also specifies that Zookeeper should be restarted if it exits abnormally.
After adding this content, save and close the file.
Add the folder permission
cd /home/sw/kafka
sudo chmod -R 777 .
Next, create the systemd service file for kafka
:
sudo vim /etc/systemd/system/kafka.service
Enter the following unit definition into the file:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/sw/kafka/bin/kafka-server-start.sh /home/sw/kafka/config/server.properties > /home/sw/kafka/kafka.log 2>&1'
ExecStop=/home/sw/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
The [Unit]
section specifies that this unit file depends on zookeeper.service
. This will ensure that zookeeper
gets started automatically when the kafka
service starts.
The [Service]
section specifies that systemd should use the kafka-server-start.sh
and kafka-server-stop.sh
shell files for starting and stopping the service. It also specifies that Kafka should be restarted if it exits abnormally.
Now that you have defined the units, start Kafka with the following command:
sudo systemctl start kafka
To ensure that the server has started successfully, check the journal logs for the kafka
unit:
sudo systemctl status kafka
You will receive output like this:
● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
Active: active (running) since Sun 2021-10-10 22:55:29 +08; 1s ago
Main PID: 7387 (sh)
Tasks: 24 (limit: 4615)
Memory: 97.8M
CGroup: /system.slice/kafka.service
├─7387 /bin/sh -c /home/sw/kafka/bin/kafka-server-start.sh /home/sw/kafka/config/server.properties > /home/sw/kafka/kafka.log 2>&1
└─7388 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.he>
Oct 10 22:55:29 Truenasubuntusw systemd[1]: Started kafka.service.
You now have a Kafka server listening on port 9092
.
You have started the kafka
service. But if you rebooted your server, Kafka would not restart automatically. To enable the kafka
service on server boot, run the following commands:
sudo systemctl enable zookeeper
sudo systemctl enable kafka
In this step, you started and enabled the kafka
and zookeeper
services. In the next step, you will check the Kafka installation.
Run
Ref https://kafka.apache.org/quickstart
启动
Linux
- zookeeper uses port 2181
- Kafka uses port 9092
As Apache Kafka cannot work without zookeeper, it is mandatory to run zookeeper server on the system.
# start zookeeper
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Everything should go as expected and you see a message in the terminal about 2181 port binding.
[2021-10-08 15:35:11,387] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
In the second terminal, tab let’s start broker instance — Kafka itself. To do this, simply run from the same directory.
# modify num.partitions=1 -> num.partitions=3
$ vim /usr/local/etc/kafka/server.properties
# To start kafka:
$ brew services start kafka
# Or, if you don't want/need a background service you can just run:
$ /usr/local/opt/kafka/bin/kafka-server-start /usr/local/etc/kafka/server.properties
Now you should see that the broker is up and running too.
[2021-10-08 15:37:34,011] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-10-08 15:37:34,011] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-10-08 15:37:34,011] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
Also, tab with ZooKeeper should show you a new session has been established.
[2021-10-08 15:41:00,498] INFO Creating new log file: log.45 (org.apache.zookeeper.server.persistence.FileTxnLog)
From Kafka, we can see the connection to ZooKeeper is estabilished
[2021-10-08 15:41:00,492] INFO Socket connection established, initiating session, client: /127.0.0.1:61713, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
macOS
# prepare zookeeper
$ mkdir .zookeeper; cd .zookeeper; mkdir data
$ sudo vim zookeeper.properties
tickTime=2000
initLimit=10
syncLimit=5
dataDir=~/.zookeeper/data
clientPort=2181
$ zookeeper-server-start ~/.zookeeper/zookeeper.properties
# prepara kfka
$ mkdir .kafka; cd .kafka;
$ sudo vim server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=2
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=c:/tmp/kafka-logs/broker0
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
Launch the kafka server
$ kafka-server-start ~/.kafka/server.properties
Via Docker
Install
Download or copy the contents of the Confluent Platform all-in-one Docker Compose file, for example:
$ curl --silent --output docker-compose.yml \
https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.2.1-post/cp-all-in-one/docker-compose.yml
Start Confluent Platform with the -d
option to run in detached mode:
$ docker-compose up -d
The above command starts Confluent Platform with a separate container for each Confluent Platform component. Your output should resemble the following:
Creating network "cp-all-in-one_default" with the default driver
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating rest-proxy ... done
Creating connect ... done
Creating ksql-datagen ... done
Creating ksqldb-server ... done
Creating control-center ... done
Creating ksqldb-cli ... done
To verify that the services are up and running, run the following command:
$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp, 0.0.0.0:9101->9101/tcp,:::9101->9101/tcp
connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 9092/tcp
control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp,:::9021->9021/tcp
ksql-datagen bash -c echo Waiting for K ... Up
ksqldb-cli /bin/sh Up
ksqldb-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp,:::8088->8088/tcp
rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp,:::8082->8082/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp,:::8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp
If the state is not Up
, rerun the docker-compose up -d
command.
Create Kafka Topics
http://192.168.18.134:9021/clusters
Reference
- Kafka The definition Guide
- https://kafka.apache.org/intro
- https://kafka.apache.org/quickstart
- https://kafka.apache.org/documentation/
- https://medium.com/pharos-production/apache-kafka-macos-installation-guide-a5a3754f09c
- https://gist.github.com/jarrad/3528a5d9128fe693ca84
- https://stackoverflow.com/questions/53428903/zookeeper-is-not-a-recognized-option-when-executing-kafka-console-consumer-sh
- https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html
- https://stackoverflow.com/questions/38549867/how-to-set-group-name-when-consuming-messages-in-kafka-using-command-line