证书生成
生成证书以及jks参考以下文章
https://blog.csdn.net/qq_41527073/article/details/121148600
证书转换jks -> pem
需要转化成p12以下转换才能适配confluent_kafka包,直接jks转pem会报错不能使用,具体参考以下文章
https://www.ngui.cc/zz/1104321.html?action=onClick
keytool -importkeystore -srckeystore server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12
openssl pkcs12 -in server.p12 -nokeys -out server.cer.pem
keytool -importkeystore -srckeystore server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12
openssl pkcs12 -in client.p12 -nokeys -out client.cer.pem
openssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem
(转换过程)
生成证书目录结构:
python客户端示例代码
import rsa, json
import time, sys
from kafka import KafkaProducer
import confluent_kafka
import ssl
def encrypt(msg):
with open('public.pem', 'rb') as publickfile:
p = publickfile.read()
pubkey = rsa.PublicKey.load_pkcs1(p)
original_text = msg.encode('utf8')
crypt_text = rsa.encrypt(original_text, pubkey)
return crypt_text
def decrypt(data):
with open('private.pem', 'rb') as privatefile:
p = privatefile.read()
privkey = rsa.PrivateKey.load_pkcs1(p)
crypt_text = data
original_text = rsa.decrypt(crypt_text, privkey)
return original_text.decode('utf8')
def produce_message():
producer: KafkaProducer = None
success = 0
conn_error = 0
msg = {
"type": "webclone",
"api": "delete",
"state": True,
"nodename": "node-1",
"uuid": "asjdkjrh"
}
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.set_ciphers('TLSv1:TLSv1.2')
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
while success < 5:
try:
producer.send('message_push', value=json.dumps(msg))
producer.flush()
success += 1
except KeyboardInterrupt:
break
except:
while True:
try:
producer = KafkaProducer(
bootstrap_servers=['172.XX.X.XX:9093'],
acks = 1,
security_protocol="SASL_SSL",
ssl_cafile="/Kafka/config/ssl/server.cer.pem",
ssl_certfile="/Kafka/config/ssl/client.cer.pem",
ssl_keyfile="/Kafka/config/ssl/client.key.pem",
ssl_context=context,
sasl_mechanism="PLAIN",
sasl_plain_username="kafka",
sasl_plain_password="XXXXX",
api_version = (2, 0),
)
break
except Exception as e:
producer = None
print(e)
conn_error += 1
time.sleep(1)
print(f"connect error: {conn_error}")
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 test.py produce|consume")
sys.exit(1)
start_time = time.time()
if sys.argv[1] == 'produce':
produce_message()
elif sys.argv[1] == 'consume':
consume_message()
end_time = time.time()
print(f"start at: {start_time}, end at: {end_time}, cost: {end_time - start_time} seconds")