flink启动报错Failed to construct kafka producer

news2025/1/22 18:05:59

flink local模式下启动 sink2kafka报错,具体报错如下

apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:56)
......................
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)

提取报错信息

Failed to construct kafka producer

class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

代码

flink版本是14.6

kafkaProperties里存的是kafka的信息

   println(s"========kafka properties========\r\n$kafkaProperties");
    val broker: String = kafkaProperties.getProperty("broker")
    val topic: String = kafkaProperties.getProperty("topic")
    val kafkaSink: KafkaSink[String] = KafkaSink.builder()
      .setBootstrapServers(broker)
      .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
          .setTopic(topic)
          .setValueSerializationSchema(new SimpleStringSchema())
          .build()
      )
      .setKafkaProducerConfig(kafkaProperties)
      .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
      .build();
    data.map(record=>JacksonManager.mapper.writeValueAsString(record))
      .sinkTo(kafkaSink).name("sink2kafka")

本地起了一个sink2kafka的demo 也没问题,但是在服务器启动的时候就报错了,试了多次无果,开始分析报错原因。

我们要sink2kafka,那么flink肯定根据我们的kafka信息创建一个kafkaProducer

对应的报错,这里是kafkaProducer的构造器init失败了

org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)

那么为什么init失败了呢?因为这个类ByteArraySerializer 不是Serializer 的实例

class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

那么这个为什么不是实例呢?我们在idea里看下

package org.apache.kafka.common.serialization;

public class ByteArraySerializer implements Serializer<byte[]> {
    @Override
    public byte[] serialize(String topic, byte[] data) {
        return data;
    }
}

这里明明就是,为啥说不是啊。。。需要思考下。

当时我最开始就考虑是jar包冲突,再看下是否冲突,突然想到一个问题,项目中的有两个人

a喜欢打非依赖的jar的包,也就是flink的jar都不打进去,全放到服务器的flink_home/jar里

b喜欢打全依赖的jar包,也就是所有flink的jar都打进去,然后执行。

目前是b的工程,那么会不会是jar冲突了,是自己工程冲突了 还是打的jar和flink_home/jar里的jar冲突了?

先看工程

 然后我看了服务器的

那么原因就出来的,排除多余的jar。就正常启动了 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/748471.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【JDBC】Java连接MySQL数据库

文章目录 一、数据库编程二、Java数据库编程JDBC2.1 什么是JDBC2.2 JDBC的工作原理 三、JDBC基本操作3.1 JDBC API3.2 数据库连接Connection3.3 Statement对象3.4 ResultSet对象 四、应用案例 一、数据库编程 数据库编程指的是通过编程语言与数据库进行交互和操作的过程&#…

uniapp 一些常用的公共方法

封装代码可以看这篇文章: uniapp 封装公共方法(无需每个页面引用,直接可以调用)_前端小胡兔的博客-CSDN博客uniapp 封装公共方法https://blog.csdn.net/weixin_44805839/article/details/131684296?spm1001.2014.3001.5501 常用方法: 自定义返回页面 (直接使用uni.naviga…

DuiLib中的list控件以及ListContainerElement控件

文章目录 前言1、创建list控件2、创建 ListContainerElement 元素&#xff0c;并添加到 List 控件中,这里的ListContainerElement用xml来表示3、在 ListContainerElement 元素中添加子控件 1、List控件2、ListContainerElement控件 前言 在 Duilib 中&#xff0c;List 控件用于…

【微服务】集成其他已有的模块

目录 下载新的模块信息删除git信息将已有模块复制到当前项目里面在父pom文件中&#xff0c;加上复制进的模块重新解析结果 下载新的模块信息 删除git信息 将已有模块复制到当前项目里面 在父pom文件中&#xff0c;加上复制进的模块 重新解析 结果 集成完成

第二十四章:索引的数据结构

第二十四章&#xff1a;索引的数据结构 24.1&#xff1a;为什么使用索引 ​ 索引是存储引擎用于快速找到数据记录的一种数据结构&#xff0c;就好比一本教课书的目录部分&#xff0c;通过目录中找到对应文章的页码&#xff0c;便可快速定位到需要的文章。MySQL中也是一样的道…

Java实现图片与Base64编码互转

Java实现图片与Base64编码互转 淘宝里面的html用base64转换图片&#xff0c;不知道为什么&#xff0c;不过看起来好像很美好&#xff0c;话不多说&#xff0c;直接上代码&#xff1a; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOE…

Effective Java笔记(6)避免创建不必要的对象

一般来说&#xff0c;最好能重用单个对象&#xff0c;而不是在每次需要 的时候就创建一个相同功能的新对象 。 重用方式既快速&#xff0c;又流行 。 如果对象是不可变的&#xff08; immutable ) &#xff08;详见第 17 条&#xff09;&#xff0c;它就始终可以被重用 。 作为…

Golang gui walk入门教程(一)安装walk环境

一、golang环境 Go 1.11.x or later 二、安装walk go get github.com/lxn/walk 三、安装rsrc 运行walk程序需要manifest&#xff0c;rsrc提供了这个功能 go install github.com/akavel/rsrc 安装完成后在GOPATH的bin下面会有一个rsrc.exe的可执行文件 在idea的termial输入r…

Tauri 提供界面 + 使用 Rust 实现连接远程 Linux 服务器、发送文件、执行命令

Tauri 提供界面 使用 Rust 实现连接远程 Linux 服务器、发送文件、执行命令 文章目录 Tauri 提供界面 使用 Rust 实现连接远程 Linux 服务器、发送文件、执行命令一、Tauri 概述二、界面预览三、代码参考1、main.rs2、App.vue3、Greet.vue4、依赖 一、Tauri 概述 Tauri 是一…

C语言动态获取设备的网络接口名称和状态以及对应的IP地址

一、目的 在实际项目中需要获取设备的IP地址然后通过广播的形式通知局域网内的其他设备。 二、介绍 方法一 通过ioctl方式获取SIOCGIFADDR信息 /** C Program to Get IP Address*/ #include <stdio.h> #include <string.h> #include <sys/types.h> #includ…

mfc120u.dll丢失修复,mfc120u.dll缺失的解决方法

MFC120u.dll缺失的原因 当系统中缺少或损坏了MFC120u.dll文件时&#xff0c;就会出现"MFC120u.dll缺失"的错误提示。造成MFC120u.dll缺失的原因可能有以下几种情况&#xff1a; 1.文件删除或损坏&#xff1a;MFC120u.dll文件可能因为误删除、病毒感染、硬盘故障等原…

pearsonr报错:计算结果为nan、warning .warn (stats.constantinputwarning (msg))

【1】两个都是ndarry 最后结果为nan&#xff1a; &#xff08;1&#xff09;数据类型转换&#xff1a;都转为一样的float32&#xff1a;依旧报错nan &#xff08;2&#xff09;进入函数内部debug&#xff1a; if (xx[0]).all() or (yy[0]).all() warning .warn (stats.consta…

13-接口、代码、命令如何测试?

可以通过工具&#xff0c;也可以通过代码来进行测试。 这里使用工具Postman。 1.针对接口进行测试 粘贴导入到Postman中&#xff1a; 也可直接复制URL&#xff1a; 粘贴到Postman中&#xff1a; 没有对应的参数&#xff0c;那么可以直接请求&#xff1a; 这个接口的GET、POST…

无法将“pip“识别为cmdlet、函数、脚本文件或可运行程序的名称。

出现问题如下&#xff1a; 出现问题原因&#xff1a; 没有添加pip对应的安装目录进入环境变量里面的系统变量。 解决方案&#xff1a; 1.确定python的安装路径 将python的路径添加到系统变量中 2.输入pip所在的安装路径&#xff1a; python路径\Lib\site-packages 3.添加…

PyTorch训练RNN, GRU, LSTM:手写数字识别

文章目录 pytorch 神经网络训练demoResult参考来源 pytorch 神经网络训练demo 数据集&#xff1a;MNIST 该数据集的内容是手写数字识别&#xff0c;其分为两部分&#xff0c;分别含有60000张训练图片和10000张测试图片 图片来源&#xff1a;https://tensornews.cn/mnist_intr…

eureka使用错误

错误 java.net.UnknownHostException: INVENTORYSERVICE 分析&解决&#xff1a; 这里的请求执行错误 但eureka可以找到服务 手动创建RestTemlate到容器中&#xff0c;未加LoadBalanced注解 加上注解后重试&#xff0c;成功

java详细显示try/catch块的异常类方法等信息

示例&#xff1a; Testpublic void testException(){try {double theorynumDouble Double.parseDouble(null);} catch (Exception e) {StackTraceElement[] stackTraceElements e.getStackTrace();StackTraceElement stackTraceElementFirst stackTraceElements[0];String c…

一文教你学会Linux数组

目录 &#x1f380;什么是数组&#xff1f; &#x1f380;数组优点 &#x1f380;数组缺点 &#x1f380;定义数组 &#x1f380;数组的取值 &#x1f380;一次取出数组所有的值 &#x1f380;数组长度&#xff1a; 即数组元素个数 &#x1f380;数组的截取&#xff…

【ASP.NET】医学实验室管理(LIS)系统源码

一、医学实验室LIS系统概况 LIS&#xff08;全称Laboratory Information Management System&#xff09;&#xff0c;是专为医院检验科设计的一套实验室信息管理系统&#xff0c;能将实验仪器与计算机组成网络&#xff0c;使病人样品登录、实验数据存取、报告审核、打印分发&am…

MySQL数据库的索引原理与慢SQL优化的5大原则

这篇文章主要介绍了MySQL数据库的索引原理与慢SQL优化的5大原则,包括&#xff1a;建立索引的原则&#xff0c;慢查询优化基本步骤&#xff0c;慢查询优化案例&#xff0c;explain使用&#xff0c;需要的朋友可以参考下 我们知道一般的应用系统&#xff0c;读写比例在10:1左右&…