flink1.16+连接Elasticsearch7官方例子报错解决方案

news2024/11/24 17:10:17

flink输出至es,连接es7时使用的sink类是Elasticsearch7SinkBuilder,与es6链接方式不同,官网地址:fink连接es官方地址。

flink连接es7官方示例

依赖jar:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7</artifactId>
    <version>3.0.1-1.16</version>
</dependency>

官方连接es7代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.HashMap;
import java.util.Map;

DataStream<String> input = ...;

input.sinkTo(
    new Elasticsearch7SinkBuilder<String>()
        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
        .setEmitter(
        (element, context, indexer) ->
        indexer.add(createIndexRequest(element)))
        .build());

private static IndexRequest createIndexRequest(String element) {
    Map<String, Object> json = new HashMap<>();
    json.put("data", element);

    return Requests.indexRequest()
        .index("my-index")
        .id(element)
        .source(json);
}

本地运行报错

java.lang.IllegalStateException: The elasticsearch emitter must be serializable.
错误提示很明确:emitter没有序列化,点击查看ElasticsearchEmitter的源码其实最终是有继承序列化接口的,那就猜测是使用lambda方式不能自己实现序列化,那就自己写个实现类ElasticsearchEmitter接口然后显示序列化一次。

修改后代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.HashMap;
import java.util.Map;

DataStream<String> input = ...;

input.sinkTo(
    new Elasticsearch7SinkBuilder<String>()
        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
        .setEmitter(
        (element, context, indexer) ->
        indexer.add(createIndexRequest(element)))
        .build());

private static IndexRequest createIndexRequest(String element) {
    Map<String, Object> json = new HashMap<>();
    json.put("data", element);

    return Requests.indexRequest()
        .index("my-index")
        .id(element)
        .source(json);
}

//自己新增类继承ElasticsearchEmitter,实现序列化接口
    static class MyElasticsearchEmitter implements ElasticsearchEmitter<String>, Serializable {

        @Override
        public void emit(String element, SinkWriter.Context context, RequestIndexer indexer) {
            indexer.add(createIndexRequest(element));
        }

    }

启动后验证成功。

flink连接es7抽出完成工具类


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.elasticsearch.sink.*;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.io.Serializable;

@Slf4j
public class Es7SinkFactory {

    public static Sink createSink(String host, Integer port, String schema, String user, String pwd, String index) {
        try {

            ElasticsearchSink<String> elasticsearchSink = new Elasticsearch7SinkBuilder<String>()
                    .setHosts(new HttpHost(host, port, schema))
                    .setConnectionUsername(user)
                    .setConnectionPassword(pwd)
                    .setEmitter(new MyElasticsearchEmitter(index))
                    .setBulkFlushMaxSizeMb(5)//最大缓冲大小
                    .setBulkFlushInterval(5000)//刷新间隔
                    .setBulkFlushMaxActions(1000)//批量插入最大条数
                    .setBulkFlushBackoffStrategy(FlushBackoffType.CONSTANT, 5, 100)
                    .build();

            return elasticsearchSink;
        } catch (Exception e) {
            log.error("Es7SinkFactory.createSink error e=", e);
        }

        return null;
    }

    private static IndexRequest createIndexRequest(String element, String index) {
        return Requests.indexRequest()
                .index(index)
                .source(JSON.parseObject(element));
    }


    //匿名内部类/lamda表达式未集成序列化接口,需要实现序列化接口
    static class MyElasticsearchEmitter implements ElasticsearchEmitter<String>, Serializable {

        private String index;

        MyElasticsearchEmitter(String index) {
            this.index = index;
        }

        @Override
        public void emit(String element, SinkWriter.Context context, RequestIndexer indexer) {
            indexer.add(createIndexRequest(element, index));
        }

    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2246773.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SQL-多表操作

前文所介绍的sql操作都是基于单表进行的&#xff0c;接下来我们来学习多表操作。 多表设计 在实际的项目开发中&#xff0c;会根据业务需求和业务模块之间的关系进行数据库表结构设计&#xff0c;由于业务之间相互关联&#xff0c;所以各个表结构之间也存在着各种联系&#xf…

c++ STL线程安全使用

c STL不是线程安全的&#xff0c;因此在多线程中使用的时候&#xff0c;操作同一个容器&#xff0c;会崩溃&#xff0c;因此需要解决线程安全的问题&#xff1a; 使用实例类似于以下&#xff1a; #include <thread> #include <vector> #include "thread_safe…

Swift 实现判断链表是否存在环:快慢指针法

文章目录 前言摘要描述题解答案题解代码题解代码分析示例测试及结果时间复杂度空间复杂度总结关于我们 前言 本题由于没有合适答案为以往遗留问题&#xff0c;最近有时间将以往遗留问题一一完善。 LeetCode - #141 环形链表 不积跬步&#xff0c;无以至千里&#xff1b;不积小流…

SpringCloud实用-OpenFeign 调用三方接口

文章目录 前言正文一、项目环境二、项目结构2.1 包的含义2.2 代理的场景 三、完整代码示例3.1 定义FeignClient3.2 定义拦截器3.3 配置类3.4 okhttp配置3.5 响应体3.5.1 天行基础响应3.5.2 热点新闻响应 3.6 代理类3.6.1 代理工厂3.6.2 代理客户端3.6.3 FeignClient的建造器 四…

C++设计模式行为模式———中介者模式

文章目录 一、引言二、中介者模式三、总结 一、引言 中介者模式是一种行为设计模式&#xff0c; 能让你减少对象之间混乱无序的依赖关系。 该模式会限制对象之间的直接交互&#xff0c; 迫使它们通过一个中介者对象进行合作。 中介者模式可以减少对象之间混乱无序的依赖关系&…

HarmonyOS:使用ArkWeb构建页面

一、简介 页面加载是Web组件的基本功能。根据页面加载数据来源可以分为三种常用场景&#xff0c;包括加载网络页面、加载本地页面、加载HTML格式的富文本数据。 页面加载过程中&#xff0c;若涉及网络资源获取&#xff0c;需要配置ohos.permission.INTERNET网络访问权限。 二、…

矩阵的拼接

矩阵的拼接分为横向拼接和纵向拼接 注意&#xff1a;横向拼接要求两矩阵行数相同&#xff0c;纵向拼接要求两矩阵列数相同 h o r z c a t horzcat horzcat和 v e r t c a t vertcat vertcat函数 h o r z c a t ( a , b ) horzcat(a,b) horzcat(a,b)将 a a a和 b b b横向拼接&a…

SpringCloud框架学习(第五部分:SpringCloud Alibaba入门和 nacos)

目录 十二、SpringCloud Alibaba入门简介 1. 基本介绍 2.作用 3.版本选型 十三、 SpringCloud Alibaba Nacos服务注册和配置中心 1.简介 2.各种注册中心比较 3.下载安装 4.Nacos Discovery服务注册中心 &#xff08;1&#xff09; 基于 Nacos 的服务提供者 &#xf…

Ollama vs VLLM:大模型推理性能全面测评!

最近在用本地大模型跑实验&#xff0c;一开始选择了ollama,分别部署了Qwen2.5-14B和Qwen2.5-32B&#xff0c;发现最后跑出来的实验效果很差&#xff0c;一开始一直以为prompt的问题&#xff0c;尝试了不同的prompt&#xff0c;最后效果还是一直不好。随后尝试了vllm部署Qwen2.5…

.NET9 - 新功能体验(一)

被微软形容为“迄今为止最高效、最现代、最安全、最智能、性能最高的.NET版本”——.NET 9已经发布有一周了&#xff0c;今天想和大家一起体验一下新功能。 此次.NET 9在性能、安全性和功能等方面进行了大量改进&#xff0c;包含了数千项的修改&#xff0c;今天主要和大家一起体…

LeetCode 144.二叉树的前序遍历

题目&#xff1a;给你二叉树的根节点 root &#xff0c;返回它节点值的 前序 遍历。 思路&#xff1a;根 左 右 代码&#xff1a; /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNod…

【论文阅读】WGSR

0. 摘要 0.1. 问题提出 1.超分辨率(SR)是一个不适定逆问题&#xff0c;可行解众多。 2.超分辨率(SR)算法在可行解中寻找一个在保真度和感知质量之间取得平衡的“良好”解。 3.现有的方法重建高频细节时会产生伪影和幻觉&#xff0c;模型区分图像细节与伪影仍是难题。 0.2. …

游戏引擎学习第21天

虽然没有上一节的难但是内容也很多 关于实现和使用脚本语言 以下是详细复述&#xff1a; 许多人经常问一个问题&#xff0c;反复问过好几次&#xff0c;那就是&#xff1a;是否会在项目中实现脚本语言。这个问题的具体形式通常是&#xff1a;你们会使用脚本语言吗&#xff1…

NVR接入录像回放平台EasyCVR视频融合平台加油站监控应用场景与实际功能

在现代社会中&#xff0c;加油站作为重要的能源供应点&#xff0c;面临着安全监管与风险管理的双重挑战。为应对这些问题&#xff0c;安防监控平台EasyCVR推出了一套全面的加油站监控方案。该方案结合了智能分析网关V4的先进识别技术和EasyCVR视频监控平台的强大监控功能&#…

springboot vue工资管理系统源码和答辩PPT论文

人类现已迈入二十一世纪&#xff0c;科学技术日新月异&#xff0c;经济、资讯等各方面都有了非常大的进步&#xff0c;尤其是资讯与网络技术的飞速发展&#xff0c;对政治、经济、军事、文化等各方面都有了极大的影响。 利用电脑网络的这些便利&#xff0c;发展一套工资管理系统…

部署实战(二)--修改jar中的文件并重新打包成jar文件

一.jar文件 JAR 文件就是 Java Archive &#xff08; Java 档案文件&#xff09;&#xff0c;它是 Java 的一种文档格式JAR 文件与 ZIP 文件唯一的区别就是在 JAR 文件的内容中&#xff0c;多出了一个META-INF/MANIFEST.MF 文件META-INF/MANIFEST.MF 文件在生成 JAR 文件的时候…

RabbitMQ4:work模型

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

SpringMVC——简介及入门

SpringMVC简介 看到SpringMVC这个名字&#xff0c;我们会发现其中包含Spring&#xff0c;那么SpringMVC和Spring之间有怎样的关系呢&#xff1f; SpringMVC隶属于Spring&#xff0c;是Spring技术中的一部分。 那么SpringMVC是用来做什么的呢&#xff1f; 回想web阶段&#x…

鸿蒙开发-文件与分享

文件分类 按所有者&#xff1a; 应用文件&#xff1a;所有者为应用&#xff0c;包括应用安装文件、应用资源文件、应用缓存文件等。 用户文件&#xff1a;所有者是登录到该终端设备的用户&#xff0c;包括用户私有的图片、视频、音频、文档等。 系统文件&#xff1a;与应用和…

内存级文件原理——Linux

目录 进程与文件 Linux下的文件系统 文件操作&#xff0c;及文件流 C语言函数 文件流 文件描述符 系统调用操作 系统调用参数 重定向与文件描述符 输出重定向 输入重定向 文件内容属性 Linux下一切皆文件 进程与文件 当我们对文件进行操作时&#xff0c;文件必…