kafka客户端调用
springboot整合kafka java调用kafka 其他问题
springboot整合kafka
手动提交需要在配置文件配置kafka属性 kafka.listener.ack-mode: manual
@Component
public class MyKafkaListener {
@Autowired
private SaslClient saslClient;
@KafkaListener ( topics = { "主题" } , groupId = "消费组" )
public void onMessage ( ConsumerRecord < String , String > record, Acknowledgment ack) {
try {
saslClient. consume ( record) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
} finally {
ack. acknowledge ( ) ;
}
}
}
yml增加配置kafka :
listener :
ack-mode : manual
bootstrap-servers :
consumer :
isolation-level : read- committed
enable-auto-commit : false
auto-offset-reset : earliest
key-deserializer : org.apache.kafka.common.serialization.StringDeserializer
value-deserializer : org.apache.kafka.common.serialization.StringDeserializer
max-poll-records : 2
properties :
security :
protocol : SASL_PLAINTEXT
sasl :
mechanism : SCRAM- SHA- 512
jaas :
config : org.apache.kafka.common.security.scram.ScramLoginModule required username="用户" password="密码";
session :
timeout :
ms : 24000
max :
poll :
interval :
ms : 30000
java调用kafka
@Value ( "${kafakaData.topic}" )
private String topic;
@Value ( "${kafkaData.group}" )
private String group;
@Value ( "${kafkaData.jaas}" )
private String jaas;
@Value ( "${kafkaData.key}" )
private String key;
@Value ( "${kafkaData.brokers}" )
private String brokers;
public void consume ( ) throws Exception {
Properties properties = new Properties ( ) ;
properties. put ( "security.protocol" , "SASL_PLAINTEXT" ) ;
properties. put ( "sasl.mechanism" , "SCRAM-SHA-512" ) ;
properties. put ( "bootstrap.servers" , brokers) ;
properties. put ( "group.id" , group) ;
properties. put ( "enable.auto.commit" , "false" ) ;
properties. put ( "auto.offset.reset" , "earliest" ) ;
properties. put ( "max.poll.records" , 2 ) ;
properties. put ( "key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ) ;
properties. put ( "value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ) ;
properties. put ( "sasl.jaas.config" , jaas) ;
KafkaConsumer < String , String > consumer = new KafkaConsumer < > ( properties) ;
consumer. subscribe ( Arrays . asList ( topic) ) ;
while ( true ) {
ConsumerRecords < String , String > records = consumer. poll ( Duration . ofMillis ( 3000 ) ) ;
System . out. printf ( "poll records size = %d%n" , records. count ( ) ) ;
try {
for ( ConsumerRecord < String , String > record : records) {
String publicDecrypt = RSAUtil . publicDecrypt ( record. value ( ) , RSAUtil . getPublicKey ( key) ) ;
JSONObject jsonObject = JSONObject . parseObject ( publicDecrypt) ;
String msg = jsonObject. getString ( "msg" ) ;
String page = jsonObject. getString ( "page" ) ;
String size = jsonObject. getString ( "size" ) ;
String time = jsonObject. getString ( "time" ) ;
String total = jsonObject. getString ( "total" ) ;
String type = jsonObject. getString ( "type" ) ;
String operation = jsonObject. getString ( "operation" ) ;
}
} catch ( Exception e) {
e. printStackTrace ( ) ;
} finally {
consumer. commitAsync ( ) ;
}
}
}
其他问题
每次消费一条数据必须提交,否则会影响分区,导致偏移量错位,后面就消费不到数据了