需求 Kafka 同一个集群实现,内网使用PLAINTEXT协议, 公网使用SASL_SSL。
client --SASL_SSL --> LB (NAT ) ---> kafka_broker client ---PLAINTEXT ----> kafka_broker
先配置PLAINTEXT 实现集群正常 install broker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 sudo chown ubuntu:ubuntu -R /data sudo apt install openjdk-17-jdk -y # download jdk https://jdk.java.net/archive/ cd /data/ wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz # !/bin/bash sudo chown ubuntu:ubuntu -R /data sudo apt-get update sudo apt install openjdk-17-jdk -y cd /data/ wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz tar -zxvf kafka_2.13-3.8.0.tgz ln -s kafka_2.13-3.8.0 kafka-bin # config zk # NODES=("10.10.157.141" "10.10.148.147" "10.10.149.188" ) export KAFKA_BROKER_DIR=/data/kafka-bin # mkdir $ZOOKEEPER_DIR /confcd $KAFKA_BROKER_DIR # step 1 # config properties cp config/server.properties config/server.properties.bak broker_server_config="config/server.properties" broker_id="1" sed -i "s/localhost:2181/10.10.157.141:2181,10.10.148.147:2181,10.10.149.188:2181/g" $broker_server_config sed "s/^broker.id=.*/broker.id=${broker_id}/g" $broker_server_config # config kafka broker data dir mkdir /data/kafka-data # sed "s/log.dirs\=\/tmp\/kafka-logs/log.dirs\=\/data\/kafka-data/g" $broker_server_config | grep "log.dir" sed -i "s/log.dirs\=\/tmp\/kafka-logs/log.dirs\=\/data\/kafka-data/g" $broker_server_config sewd -i # step 2 # config jvm old_string="1G" new_string="3G" file_path="./bin/kafka-server-start.sh" sed -i "s/${old_string}/${new_string}/g" "$file_path" # step3 # config service sudo bash -c 'cat > /usr/lib/systemd/system/kafka.service << EOF [Unit] Description=Apache Kafka Service After=network.target After=time-sync.target network-online.target Wants=time-sync.target [Service] Type=simple User=ubuntu Group=ubuntu ExecStart=/data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties ExecStop=/data/kafka/bin/kafka-server-stop.sh Restart=always RestartSec=30 TimeoutStopSec=300 LimitNOFILE=65536 StandardOutput=null StandardError=null [Install] WantedBy=multi-user.target EOF' # config jmx cd /data/kafka mkdir prometheus cd prometheus wget https://repo.maven.apache.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/1.0.1/jmx_prometheus_javaagent-1.0.1.jar bash -c ' cat > kafka-2_0_0.yml << EOF rules: - pattern: kafka.server<type=(\w+), name=(\w+)(, topic=(\w+))?.*> name: kafka_server_$1_$2 labels: topic: "$4" - pattern: kafka.network<type=(\w+), name=(\w+), request=(\w+),.*> name: kafka_network_$1_$2 labels: request: "$3" - pattern: kafka.network<type=RequestMetrics, name=(\w+), request=(\w+),.*> name: kafka_request_metrics_$1 labels: request: "$2" EOF' cd /data/kafka sed -i '16a\ export KAFKA_OPTS="$KAFKA_OPTS -javaagent:/data/kafka/prometheus/jmx_prometheus_javaagent-1.0.1.jar=7071:/data/kafka/prometheus/kafka-2_0_0.yml" ' bin/kafka-server-start.sh sudo systemctl daemon-reload sudo systemctl restart kafka sudo systemctl status kafka sudo systemctl enable kafka sudo systemctl status kafka
test # show log tail -f logs/server.log
配置 SASL_SSL 协议 JKS证书生成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 mkdir security cd security/ sudo openssl req -new -x509 -keyout /tmp/snakeoil-ca-1. key -out /tmp/snakeoil-ca-1. crt -days 365 -subj '/CN=ca1.test.confluent.io/OU=TEST/O=CONFLUENT/L=PaloAlto/ST=Ca/C=US' -passin pass :confluent -passout pass :confluent keytool -genkeypair -alias kafka-broker -keyalg RSA -keysize 2048 \ -validity 3650 -keystore kafka-broker-keystore.jks -dname "CN=b8c4-fa18b654c0b2-data.my-cluster-prod.com, OU=my-cluster-prod, O=my-cluster-prod, L=Los_Angeles, ST=California, C=US" \ -storepass pwdXX2024P87d98ac9wz -keypass oker2024P87d98ac9wz keytool -export -alias kafka-broker -file kafka-broker-cert.crt -keystore kafka-broker-keystore.jks -storepass pwdXX2024P87d98ac9wz keytool -list -v -keystore kafka-broker-keystore.jks -storepass pwdXX2024P87d98ac9wz keytool -import -alias kafka-broker -file kafka-broker-cert.crt -keystore truststore.jks -storepass pwdXX2024P87d98ac9wz keytool -list -v -keystore truststore.jks -storepass truststore_password keytool -list -v -keystore truststore.jks -storepass pwdXX2024P87d98ac9wz # load keytool -import -trustcacerts -file broker-cmm-certificate.crt -keystore broker-cmm-truststore.jks -storetype JKS -storepass pwdPassword8e95-942e564afa33 -alias pwdPassword8e95-942e564afa33
配置broker sasl_ssl 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 broker.id =1 #需要配置 listeners PLAINTEXT + SASL_SSL listeners=PLAINTEXT : #需要配置 advertised.listeners PLAINTEXT + SASL_SSL advertised.listeners =PLAINTEXT : listener.security .protocol .map =PLAINTEXT :PLAINTEXT ,SASL_SSL :SASL_SSL #broker内部通信使用PLAINTEXT security.inter .broker .protocol =PLAINTEXT sasl.mechanism .inter .broker .protocol =PLAIN security.protocol =SASL_SSL sasl.enabled .mechanisms =PLAIN ssl.keystore .location =/data/ kafka-bin/security/broker-cmm-keystore.jks ssl.keystore .password =xxx-942e564afa33 ssl.key .password =xxx-942e564afa33 ssl.truststore .location =/data/ kafka-bin/security/broker-cmm-truststore.jks ssl.truststore .password =xxxx-942e564afa33 ssl.endpoint .identification .algorithm = # SSL config 验证client 证书 ssl.client .auth =required num.network .threads =6 num.io .threads =12 socket.send .buffer .bytes =524288 socket.receive .buffer .bytes =524288 socket.request .max .bytes =104857600 log.dirs =/data/ kafka-data num.partitions =1 num.recovery .threads .per .data .dir =1 offsets.topic .replication .factor =1 transaction.state .log .replication .factor =1 transaction.state .log .min .isr =1 log.retention .hours =168 log.retention .check .interval .ms =300000 compression.type =zstd zookeeper.connect =10.21 .100 .88 :2181 ,10.21 .100 .46 :2181 ,10.21 .100 .211 :2181 /prod-kafka-cluster-bigdata zookeeper.connection .timeout .ms =18000 group.initial .rebalance .delay .ms =0
配置 servcie 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 [Unit ] Description =Apache Kafka Service After =network.target After =time-sync.target network-online.target Wants =time-sync.target [Service ] Type =forkingUser =ubuntuGroup =ubuntuRestart =on-failureRestartSec =5 TimeoutStopSec =300 LimitNOFILE =260000 LimitFSIZE =infinityEnvironment ="KAFKA_OPTS= -Djava.security.auth.login.config=/data/kafka-bin/security/kafka_server_jaas.conf -javaagent:/data/kafka-bin/prometheus/jmx_prometheus_javaagent-1.0.1.jar=10071:/data/kafka-bin/prometheus/kafka_broker.yml" Environment ="KAFKA_HEAP_OPTS=-Xms8G -Xmx8G " ExecStart =/data/ kafka-bin/bin/kafka-server-start.sh -daemon /data/kafka-bin/config/server.properties [Install ] WantedBy =multi-user.target
client test ./bin/kafka-console -consumer.sh --bootstrap-server node01.kafka -broker-data.my -host-pord.com :19096 ,node02.kafka -broker-data. my-host-pord.com :19096 ,node03.kafka -broker-data. my-host-pord.com :19096 --topic source.pro -pro-connect-offsets-v2 --consumer.config kafka-client-sasl-ssl-plain.properties --partition 0 --property print.value =true --from -beginning