在保证本地Java环境安装以及配置成功的情况下,Kafka的运行比我想象中容易不少,也没有出现奇奇怪怪的报错。
-
去官网(Apache Kafka)下载二进制包并解压
-
在终端窗口进入解压后的目录后,执行以下命令启动zookeeper:
./bin/zookeeper-server-start.sh config/zookeeper.properties
启动之后,保留这个终端窗口不要关闭
-
新开一个终端窗口,在同一目录,执行以下命令启动Kafka服务:
./bin/kafka-server-start.sh config/server.properties
同样,启动之后,保留这个终端窗口不要关闭
-
再开一个终端窗口,用来创建topic。执行以下命令:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
test-topic是我们创建的topic的名称,可以随自己定。这个命令执行完之后,可以关掉终端窗口,也可以不关,直接在这个窗口里执行下一步操作。
-
执行以下命令启动一个生产者
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
现在,可以在该窗口中输入想要发送到消息队列的信息。
-
打开另一个新的终端窗口,进入 Kafka 目录,并使用以下命令启动消息消费者:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
这时候会发现,之前在生产者中发送的消息会显示在消费者的终端窗口中。
但是这样启动消费者会有一个问题,每次重启消费者之后都会从头消费所有生产者以及产生的消息。如果想要消费者这边只消费新的消息,不消费已经消费过的消息,就需要将消费者加入用户组后启动。具体命令如下:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --group my-consumer-group