Kafka 代理

此範例示範透過 Envoy 代理的 Kafka 代理的一些基本操作。

為了您的方便,composition 提供了一個 Docker 化 Kafka 客戶端。

如果您在主機系統上安裝了 kafka-console-* 二進制檔,則可以使用 --bootstrap-server localhost:10000 主機二進制檔來遵循範例。

此外,還示範了 Envoy 為 Kafka 代理擴充功能和相關叢集指標收集的統計資料。

步驟 1:啟動所有容器

變更至 examples/kafka 目錄。

$ pwd
envoy/examples/kafka
$ docker compose pull
$ docker compose up --build -d
$ docker compose ps

         Name                      Command                State                            Ports
-----------------------------------------------------------------------------------------------------------------------
kafka_kafka-server_1   /etc/confluent/docker/run      Up             9092/tcp
kafka_proxy_1          /docker-entrypoint.sh /usr ... Up             0.0.0.0:10000->10000/tcp, 0.0.0.0:8001->8001/tcp
kafka_zookeeper_1      /etc/confluent/docker/run      Up (healthy)   2181/tcp, 2888/tcp, 3888/tcp

步驟 2:建立 Kafka 主題

首先建立一個名為 envoy-kafka-broker 的 Kafka 主題

$ export TOPIC="envoy-kafka-broker"
$ docker compose run --rm kafka-client kafka-topics --bootstrap-server proxy:10000 --create --topic $TOPIC

步驟 3:檢查 Kafka 主題

您可以使用 kafka-topics --list 引數檢視 Kafka 知道的主題。

檢查您建立的主題是否存在

$ docker compose run --rm kafka-client kafka-topics --bootstrap-server proxy:10000 --list | grep $TOPIC

步驟 4:使用 Kafka 生產者傳送訊息

接下來,使用 kafka-console-producer 為您建立的主題傳送訊息

$ export MESSAGE="Welcome to Envoy and Kafka broker filter!"
$ docker compose run --rm kafka-client /bin/bash -c " \
    echo $MESSAGE \
    | kafka-console-producer --request-required-acks 1 --broker-list proxy:10000 --topic $TOPIC"

步驟 5:使用 Kafka 消費者接收訊息

現在您可以使用 kafka-console-consumer 接收訊息

$ docker compose run --rm kafka-client kafka-console-consumer --bootstrap-server proxy:10000 --topic $TOPIC --from-beginning --max-messages 1 | grep "$MESSAGE"

步驟 6:檢查管理員 kafka_broker 統計資料

當您代理到 Kafka 代理時,Envoy 會記錄各種統計資料。

您可以透過查詢 Envoy 管理介面來檢查代理統計資料(數字可能會略有不同,因為 kafka-client 無法精確控制其網路流量)

$ curl -s "https://127.0.0.1:8001/stats?filter=kafka.kafka_broker" | grep -v ": 0" | grep "_request:"
kafka.kafka_broker.request.api_versions_request: 9
kafka.kafka_broker.request.create_topics_request: 1
kafka.kafka_broker.request.fetch_request: 2
kafka.kafka_broker.request.find_coordinator_request: 8
kafka.kafka_broker.request.join_group_request: 2
kafka.kafka_broker.request.leave_group_request: 1
kafka.kafka_broker.request.list_offsets_request: 1
kafka.kafka_broker.request.metadata_request: 12
kafka.kafka_broker.request.offset_fetch_request: 1
kafka.kafka_broker.request.produce_request: 1
kafka.kafka_broker.request.sync_group_request: 1

步驟 7:檢查管理員 kafka_service 叢集統計資料

Envoy 也會記錄 Kafka 服務的叢集統計資料

$ curl -s "https://127.0.0.1:8001/stats?filter=cluster.kafka_service" | grep -v ": 0"
cluster.kafka_service.max_host_weight: 1
cluster.kafka_service.membership_healthy: 1
cluster.kafka_service.membership_total: 1

另請參閱

Envoy Kafka 代理過濾器

深入了解 Kafka 代理過濾器。

Kafka

Apache Kafka。