环境说明
kafka 集群: strimzi-operator部署的kafka集群
zookeeper 节点:3个
kafka 节点: 1个
channel: Apache Kafka Channel
Broker: Apache Kafka Broker
namespace:event-kafka-demo
创建 ns
# kubectl create ns event-kafka-demo
namespace/event-kafka-demo created
部署 eventing-kafka-controller
下载 eventing-kafka-controller.yaml
# wget https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.0/eventing-kafka-controller.yaml
替换 gcr.io 地址
# sed -i 's@gcr.io@gcr.dockerproxy.com@g' eventing-kafka-controller.yaml
部署 eventing-kafka-controller
# kubectl apply -f eventing-kafka-controller.yaml
configmap/kafka-broker-config created
configmap/kafka-channel-config created
customresourcedefinition.apiextensions.k8s.io/kafkachannels.messaging.knative.dev created
customresourcedefinition.apiextensions.k8s.io/consumers.internal.kafka.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/consumergroups.internal.kafka.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/kafkasinks.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/kafkasources.sources.knative.dev created
clusterrole.rbac.authorization.k8s.io/eventing-kafka-source-observer created
configmap/config-kafka-source-defaults created
configmap/config-kafka-autoscaler created
configmap/config-kafka-descheduler created
configmap/config-kafka-features created
configmap/config-kafka-leader-election created
configmap/config-kafka-scheduler created
configmap/kafka-config-logging created
configmap/config-namespaced-broker-resources created
configmap/config-tracing configured
clusterrole.rbac.authorization.k8s.io/knative-kafka-addressable-resolver created
clusterrole.rbac.authorization.k8s.io/knative-kafka-channelable-manipulator created
clusterrole.rbac.authorization.k8s.io/kafka-controller created
serviceaccount/kafka-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-controller created
clusterrolebinding.rbac.authorization.k8s.io/kafka-controller-addressable-resolver created
deployment.apps/kafka-controller created
clusterrole.rbac.authorization.k8s.io/kafka-webhook-eventing created
serviceaccount/kafka-webhook-eventing created
clusterrolebinding.rbac.authorization.k8s.io/kafka-webhook-eventing created
mutatingwebhookconfiguration.admissionregistration.k8s.io/defaulting.webhook.kafka.eventing.knative.dev created
mutatingwebhookconfiguration.admissionregistration.k8s.io/pods.defaulting.webhook.kafka.eventing.knative.dev created
secret/kafka-webhook-eventing-certs created
validatingwebhookconfiguration.admissionregistration.k8s.io/validation.webhook.kafka.eventing.knative.dev created
deployment.apps/kafka-webhook-eventing created
service/kafka-webhook-eventing created
查看 eventing-kafka-controller pod
# kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
eventing-controller-75d79c8bfb-fpv26 1/1 Running 7 (84m ago) 6d16h
eventing-webhook-79bf558944-4j6rn 1/1 Running 7 (84m ago) 6d16h
imc-controller-8d958bbf5-xvhm7 1/1 Running 5 (84m ago) 4d20h
imc-dispatcher-799f9f548-fdd99 1/1 Running 6 (84m ago) 4d20h
kafka-controller-8c5487cdb-6w7gp 1/1 Running 0 72s
kafka-webhook-eventing-5c9c7cdb9d-kqcrq 1/1 Running 0 55s
mt-broker-controller-7b98899b48-m45w6 1/1 Running 5 (84m ago) 4d20h
mt-broker-filter-788b867775-gwqn2 1/1 Running 7 (84m ago) 4d20h
mt-broker-ingress-5f5b69fb49-7d6wn 1/1 Running 14 (82m ago) 4d20h
pingsource-mt-adapter-6588d57445-5mrgq 1/1 Running 6 (84m ago) 5d22h
layer
下载 eventing-kafka-channel.yaml
# wget https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.0/eventing-kafka-channel.yaml
替换 gcr.io 地址
# sed -i 's@gcr.io@gcr.dockerproxy.com@g' eventing-kafka-channel.yaml
部署 eventing-kafka-channel
# kubectl apply -f eventing-kafka-channel.yaml
deployment.apps/kafka-channel-dispatcher created
deployment.apps/kafka-channel-receiver created
service/kafka-channel-ingress created
查看 eventing-kafka-channel pod
# kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
eventing-controller-75d79c8bfb-fpv26 1/1 Running 7 (100m ago) 6d16h
eventing-webhook-79bf558944-4j6rn 1/1 Running 7 (100m ago) 6d16h
imc-controller-8d958bbf5-xvhm7 1/1 Running 5 (100m ago) 4d21h
imc-dispatcher-799f9f548-fdd99 1/1 Running 6 (100m ago) 4d21h
kafka-channel-dispatcher-56b4997b8b-ctvx2 1/1 Running 0 8m52s
kafka-channel-receiver-6dd986d55d-b89m5 1/1 Running 0 8m52s
kafka-controller-8c5487cdb-6w7gp 1/1 Running 0 17m
kafka-webhook-eventing-5c9c7cdb9d-kqcrq 1/1 Running 0 17m
mt-broker-controller-7b98899b48-m45w6 1/1 Running 5 (100m ago) 4d21h
mt-broker-filter-788b867775-gwqn2 1/1 Running 7 (100m ago) 4d21h
mt-broker-ingress-5f5b69fb49-7d6wn 1/1 Running 14 (98m ago) 4d21h
pingsource-mt-adapter-6588d57445-5mrgq 1/1 Running 6 (100m ago) 5d22h
查看 kafka-channel api
# kubectl api-resources |grep channel
channels ch messaging.knative.dev/v1 true Channel
inmemorychannels imc messaging.knative.dev/v1 true InMemoryChannel
kafkachannels kc messaging.knative.dev/v1beta1 true KafkaChannel
配置 KafkaChannel 为默认 channel
查看 KafkaChannel group 信息
# kubectl explain KafkaChannel
GROUP: messaging.knative.dev
KIND: KafkaChannel
VERSION: v1beta1
...
default-channel-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: default-ch-webhook
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
app.kubernetes.io/version: devel
app.kubernetes.io/part-of: knative-eventing
data:
default-ch-config: |
clusterDefault:
apiVersion: messaging.knative.dev/v1
kind: InMemoryChannel
namespaceDefaults:
event-kafka-demo: # 按需指定名称空间
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
spec:
numPartitions: 5 # 在Topic上默认使用的partition的数量,默认为1;
replicationFactor: 1 # 在Topic上默认使用的复制因子,其值不能大于Kafka上的broker数量,即可用节点数,默认值为1;
创建 default-channel
# kubectl apply -f default-channel-config.yaml
configmap/default-ch-webhook configured
layer
下载 eventing-kafka-broker.yaml
# wget https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.0/eventing-kafka-broker.yaml
替换 gcr.io 地址
# sed -i 's@gcr.io@gcr.dockerproxy.com@g' eventing-kafka-broker.yaml
部署 eventing-kafka-broker
# kubectl apply -f eventing-kafka-broker.yaml
configmap/config-kafka-broker-data-plane created
clusterrole.rbac.authorization.k8s.io/knative-kafka-broker-data-plane created
serviceaccount/knative-kafka-broker-data-plane created
clusterrolebinding.rbac.authorization.k8s.io/knative-kafka-broker-data-plane created
deployment.apps/kafka-broker-dispatcher created
deployment.apps/kafka-broker-receiver created
service/kafka-broker-ingress created
查看 eventing-kafka-broker pod
# kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
eventing-controller-75d79c8bfb-fpv26 1/1 Running 7 (3h12m ago) 6d18h
eventing-webhook-79bf558944-4j6rn 1/1 Running 7 (3h12m ago) 6d18h
imc-controller-8d958bbf5-xvhm7 1/1 Running 5 (3h12m ago) 4d22h
imc-dispatcher-799f9f548-fdd99 1/1 Running 6 (3h12m ago) 4d22h
kafka-broker-dispatcher-7dcdf8fb6f-smxh8 1/1 Running 0 69s
kafka-broker-receiver-f7f48786f-bz64p 1/1 Running 0 69s
kafka-channel-dispatcher-56b4997b8b-ctvx2 1/1 Running 0 101m
kafka-channel-receiver-6dd986d55d-b89m5 1/1 Running 0 101m
kafka-controller-8c5487cdb-6w7gp 1/1 Running 0 109m
kafka-webhook-eventing-5c9c7cdb9d-kqcrq 1/1 Running 0 109m
mt-broker-controller-7b98899b48-m45w6 1/1 Running 5 (3h12m ago) 4d22h
mt-broker-filter-788b867775-gwqn2 1/1 Running 7 (3h12m ago) 4d22h
mt-broker-ingress-5f5b69fb49-7d6wn 1/1 Running 14 (3h10m ago) 4d22h
pingsource-mt-adapter-6588d57445-5mrgq 1/1 Running 6 (3h12m ago) 6d
配置 kafka broker
configmap-kafka-broker-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "5" # 在Topic上默认使用的partition的数量
# Replication factor of topic messages.
default.topic.replication.factor: "1" # 在Topic上默认使用的复制因子,其值不能大于Kafka上的broker数量,即可用节点数
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092" # 即Kafka集群的Bootstrap Server的访问入口
更新 kafka broker 配置
# kubectl apply -f configmap-kafka-broker-config.yaml
configmap/kafka-broker-config configured
配置 kafka broker 为默认的broker
configmap-default-br-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: config-br-defaults
namespace: knative-eventing
data:
default-br-config: |
clusterDefault:
brokerClass: MTChannelBasedBroker
apiVersion: v1
kind: ConfigMap
name: config-br-default-channel
namespace: knative-eventing
delivery:
retry: 10
backoffPolicy: exponential
backoffDelay: PT0.2S
namespaceDefaults:
event-kafka-demo:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
更新默认 broker 类型配置
# kubectl apply -f configmap-default-br-config.yaml
configmap/config-br-defaults configured
配置 config-br-default-channel
config-br-default-channel.yaml
apiVersion: v1
kind: ConfigMap
metadata:
labels:
app.kubernetes.io/name: knative-eventing
app.kubernetes.io/version: 1.12.0
name: config-br-default-channel
namespace: knative-eventing
data:
channel-template-spec: |
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
更新 config-br-default-channel
# kubectl apply -f config-br-default-channel.yaml
configmap/config-br-default-channel configured
下载 eventing-kafka-source.yaml
# wget https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.0/eventing-kafka-source.yaml
替换 gcr.io 地址
# sed -i 's@gcr.io@gcr.dockerproxy.com@g' eventing-kafka-source.yaml
部署 eventing-kafka-source
# kubectl apply -f eventing-kafka-source.yaml
configmap/config-kafka-source-data-plane created
clusterrole.rbac.authorization.k8s.io/knative-kafka-source-data-plane created
serviceaccount/knative-kafka-source-data-plane created
clusterrolebinding.rbac.authorization.k8s.io/knative-kafka-source-data-plane created
statefulset.apps/kafka-source-dispatcher created
查看 eventing-kafka-spurces api
# kubectl api-resources |grep -i sources
apiserversources sources.knative.dev/v1 true ApiServerSource
containersources sources.knative.dev/v1 true ContainerSource
gitlabsources sources.knative.dev/v1alpha1 true GitLabSource
kafkasources sources.knative.dev/v1beta1 true KafkaSource
pingsources sources.knative.dev/v1 true PingSource
sinkbindings sources.knative.dev/v1 true SinkBinding
创建 Kafka topic
knative-demo-topic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: knative-demo-topic
namespace: kafka # 和 kafka server端保持一致
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 5
replicas: 1
config:
retention.ms: 7200000
segment.bytes: 1073741824
创建 knative topic
# kubectl apply -f knative-demo-topic.yaml
kafkatopic.kafka.strimzi.io/knative-demo-topic created
查看 knative topic
# kubectl get kafkatopics -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 True
knative-demo-topic my-cluster 5 1 True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster 1 1 True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1 1 True
连接测试
# kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
If you don't see a command prompt, try pressing enter.
>0
>o
>
没有任何警告提示信息表示topics创建正常。
创建 event-display
event-display.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
namespace: event-kafka-demo
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/min-scale: "1"
spec:
containers:
- image: gcr.dockerproxy.com/knative-releases/knative.dev/eventing/cmd/event_display
生成 Knative Service 资源
# kubectl apply -f event-display.yaml
service.serving.knative.dev/event-display created
查看 ksvc
# kubectl get ksvc -n event-kafka-demo
NAME URL LATESTCREATED LATESTREADY READY REASON
event-display http://event-display.event-kafka-demo.svc.wgs.local event-display-00001 event-display-00001 True
创建 broker
kafka-broker.yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: default
namespace: event-kafka-demo
创建 broker 资源
# kubectl apply -f kafka-broker.yaml
broker.eventing.knative.dev/default created
查看 broker
# kubectl get broker -n event-kafka-demo
NAME URL AGE READY REASON
default http://kafka-broker-ingress.knative-eventing.svc.wgs.local/event-kafka-demo/default 67s True
查看 broker 类型
# kubectl describe broker -n event-kafka-demo
Name: default
Namespace: event-kafka-demo
Labels: <none>
Annotations: eventing.knative.dev/broker.class: Kafka
eventing.knative.dev/creator: kubernetes-admin
eventing.knative.dev/lastModifier: kubernetes-admin
API Version: eventing.knative.dev/v1
Kind: Broker
Metadata:
Creation Timestamp: 2023-11-21T09:52:16Z
Finalizers:
brokers.eventing.knative.dev
Generation: 1
Resource Version: 1945640
UID: 3a2c7ae4-ca80-422b-9d41-136032b97a5c
Spec:
Config:
API Version: v1
Kind: ConfigMap
Name: kafka-broker-config
Namespace: knative-eventing
Status:
Address:
Name: http
URL: http://kafka-broker-ingress.knative-eventing.svc.wgs.local/event-kafka-demo/default
Addresses:
Name: http
URL: http://kafka-broker-ingress.knative-eventing.svc.wgs.local/event-kafka-demo/default
Annotations:
bootstrap.servers: my-cluster-kafka-bootstrap.kafka:9092
default.topic: knative-broker-event-kafka-demo-default
default.topic.partitions: 10
default.topic.replication.factor: 1
...
创建 trigger
trigger-event.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: trigger-demo
namespace: event-kafka-demo
spec:
broker: default
#filter:
# attributes:
# type: dev.knative.kafka.event
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
namespace: event-kafka-demo
创建 trigger 资源
# kubectl apply -f trigger-event.yaml
trigger.eventing.knative.dev/trigger-demo created
查看 trigger 资源
# kubectl get trigger -n event-kafka-demo
NAME BROKER SUBSCRIBER_URI AGE READY REASON
trigger-demo default http://event-display.event-kafka-demo.svc.wgs.local 52s True
创建 Kafka event source
event-source.yaml
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
namespace: event-kafka-demo
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
namespace: event-kafka-demo
创建 event source
# kubectl apply -f event-source.yaml
kafkasource.sources.knative.dev/kafka-source created
查看 event source
# kubectl get kafkasource -n event-kafka-demo
NAME TOPICS BOOTSTRAPSERVERS READY REASON AGE
kafka-source ["knative-demo-topic"] ["my-cluster-kafka-bootstrap.kafka:9092"] True 16m
kafka event 测试
查看 event-display pod
# kubectl get pods -n event-kafka-demo
NAME READY STATUS RESTARTS AGE
event-display-00001-deployment-7764b69f9-66pnh 2/2 Running 6 (43m ago) 168m
发送消息
# kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap.kafka:9092 --topic knative-demo-topic
If you don't see a command prompt, try pressing enter.
>kafka source to broker to trigger to sink
>{"msg": "kafka source to broker to trigger to sink"}
>
查看 event 输出
# kubectl logs event-display-00001-deployment-7764b69f9-66pnh -c user-container -n event-kafka-demo
☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/event-kafka-demo/kafkasources/kafka-source#knative-demo-topic
subject: partition:4#2
id: partition:4/offset:2
time: 2023-11-21T10:03:13.428Z
Data,
kafka source to broker to trigger to sink
☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.kafka.event
source: /apis/v1/namespaces/event-kafka-demo/kafkasources/kafka-source#knative-demo-topic
subject: partition:4#3
id: partition:4/offset:3
time: 2023-11-21T10:04:35.459Z
Data,
{"msg": "kafka source to broker to trigger to sink"}
参考文档
https://knative.dev/docs/eventing/sources/kafka-source/