【Kafka】安装

Posted by 西维蜀黍 on 2021-10-08, Last Modified on 2022-11-13

Install

macOS

$ brew install kafka

# install zookeeper

Ubuntu

Ref to https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04

Install Java

$ sudo apt install openjdk-11-jre-headless

Creating a User for Kafka

Logged in as your non-root sudo user, create a user called kafka:

$ sudo adduser kafka

Follow the prompts to set a password and create the kafka user.

Next, add the kafka user to the sudo group with the adduser command. You need these privileges to install Kafka’s dependencies:

sudo adduser kafka sudo

Your kafka user is now ready. Log into the account using su:

su -l kafka

Now that you’ve created a Kafka-specific user, you are ready to download and extract the Kafka binaries.

Download

Use curl to download the Kafka binaries:

 curl -LO https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz

Create a directory called kafka and change to this directory. This will be the base directory of the Kafka installation:

mkdir ~/kafka && cd ~/kafka

Extract the archive you downloaded using the tar command:

tar -xvzf kafka_2.12-2.8.1.tgz --strip 1

We specify the --strip 1 flag to ensure that the archive’s contents are extracted in ~/kafka/ itself and not in another directory (such as ~/kafka/kafka_2.13-2.6.0/) inside of it.

Now that we’ve downloaded and extracted the binaries successfully, we can start configuring our Kafka server.

Configuring the Kafka Server

vim ~/kafka/config/server.properties

hange the directory where the Kafka logs are stored by modifying the logs.dir property:

log.dirs=/home/kafka/kafka/logs

Save and close the file. Now that you’ve configured Kafka, your next step is to create systemd unit files for running and enabling the Kafka server on startup.

Creating Systemd Unit Files and Starting the Kafka Server

Create the unit file for zookeeper:

sudo vim /etc/systemd/system/zookeeper.service

Enter the following unit definition into the file:

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/sw/kafka/bin/zookeeper-server-start.sh /home/sw/kafka/config/zookeeper.properties
ExecStop=/home/sw/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

The [Unit] section specifies that Zookeeper requires networking and the filesystem to be ready before it can start.

The [Service] section specifies that systemd should use the zookeeper-server-start.sh and zookeeper-server-stop.sh shell files for starting and stopping the service. It also specifies that Zookeeper should be restarted if it exits abnormally.

After adding this content, save and close the file.

Add the folder permission

cd /home/sw/kafka
sudo chmod -R 777 .

Next, create the systemd service file for kafka:

sudo vim /etc/systemd/system/kafka.service

Enter the following unit definition into the file:

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/sw/kafka/bin/kafka-server-start.sh /home/sw/kafka/config/server.properties > /home/sw/kafka/kafka.log 2>&1'
ExecStop=/home/sw/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

The [Unit] section specifies that this unit file depends on zookeeper.service. This will ensure that zookeeper gets started automatically when the kafka service starts.

The [Service] section specifies that systemd should use the kafka-server-start.sh and kafka-server-stop.sh shell files for starting and stopping the service. It also specifies that Kafka should be restarted if it exits abnormally.

Now that you have defined the units, start Kafka with the following command:

sudo systemctl start kafka

To ensure that the server has started successfully, check the journal logs for the kafka unit:

sudo systemctl status kafka

You will receive output like this:

● kafka.service
     Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
     Active: active (running) since Sun 2021-10-10 22:55:29 +08; 1s ago
   Main PID: 7387 (sh)
      Tasks: 24 (limit: 4615)
     Memory: 97.8M
     CGroup: /system.slice/kafka.service
             ├─7387 /bin/sh -c /home/sw/kafka/bin/kafka-server-start.sh /home/sw/kafka/config/server.properties > /home/sw/kafka/kafka.log 2>&1
             └─7388 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.he>

Oct 10 22:55:29 Truenasubuntusw systemd[1]: Started kafka.service.

You now have a Kafka server listening on port 9092.

You have started the kafka service. But if you rebooted your server, Kafka would not restart automatically. To enable the kafka service on server boot, run the following commands:

sudo systemctl enable zookeeper
sudo systemctl enable kafka

In this step, you started and enabled the kafka and zookeeper services. In the next step, you will check the Kafka installation.

Run

Ref https://kafka.apache.org/quickstart

启动

Linux

  • zookeeper uses port 2181
  • Kafka uses port 9092

As Apache Kafka cannot work without zookeeper, it is mandatory to run zookeeper server on the system.

# start zookeeper
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

Everything should go as expected and you see a message in the terminal about 2181 port binding.

[2021-10-08 15:35:11,387] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

In the second terminal, tab let’s start broker instance — Kafka itself. To do this, simply run from the same directory.

# modify num.partitions=1 -> num.partitions=3
$ vim /usr/local/etc/kafka/server.properties

# To start kafka:
$ brew services start kafka
# Or, if you don't want/need a background service you can just run:
$ /usr/local/opt/kafka/bin/kafka-server-start /usr/local/etc/kafka/server.properties

Now you should see that the broker is up and running too.

[2021-10-08 15:37:34,011] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-10-08 15:37:34,011] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2021-10-08 15:37:34,011] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)

Also, tab with ZooKeeper should show you a new session has been established.

[2021-10-08 15:41:00,498] INFO Creating new log file: log.45 (org.apache.zookeeper.server.persistence.FileTxnLog)

From Kafka, we can see the connection to ZooKeeper is estabilished

[2021-10-08 15:41:00,492] INFO Socket connection established, initiating session, client: /127.0.0.1:61713, server: localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)

macOS

# prepare zookeeper
$ mkdir .zookeeper; cd .zookeeper; mkdir data
$ sudo vim zookeeper.properties
tickTime=2000
initLimit=10
syncLimit=5
dataDir=~/.zookeeper/data
clientPort=2181
$ zookeeper-server-start ~/.zookeeper/zookeeper.properties

# prepara kfka
$ mkdir .kafka; cd .kafka;
$ sudo vim server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>

# The number of threads handling network requests
num.network.threads=2
 
# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=c:/tmp/kafka-logs/broker0

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=2

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk. 
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies
log.retention.check.interval.ms=60000

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

Launch the kafka server

$ kafka-server-start ~/.kafka/server.properties

Via Docker

Install

Download or copy the contents of the Confluent Platform all-in-one Docker Compose file, for example:

$ curl --silent --output docker-compose.yml \
  https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.2.1-post/cp-all-in-one/docker-compose.yml

Start Confluent Platform with the -d option to run in detached mode:

$ docker-compose up -d

The above command starts Confluent Platform with a separate container for each Confluent Platform component. Your output should resemble the following:

Creating network "cp-all-in-one_default" with the default driver
Creating zookeeper ... done
Creating broker    ... done
Creating schema-registry ... done
Creating rest-proxy      ... done
Creating connect         ... done
Creating ksql-datagen    ... done
Creating ksqldb-server   ... done
Creating control-center  ... done
Creating ksqldb-cli      ... done

To verify that the services are up and running, run the following command:

$  docker-compose ps
     Name                    Command               State                                         Ports
---------------------------------------------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp,:::9092->9092/tcp, 0.0.0.0:9101->9101/tcp,:::9101->9101/tcp
connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 9092/tcp
control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp,:::9021->9021/tcp
ksql-datagen      bash -c echo Waiting for K ...   Up
ksqldb-cli        /bin/sh                          Up
ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp,:::8088->8088/tcp
rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp,:::8082->8082/tcp
schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp,:::8081->8081/tcp
zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp

If the state is not Up, rerun the docker-compose up -d command.

Create Kafka Topics

http://192.168.18.134:9021/clusters

Reference