flink自定义process,使用状态求历史总和(scala)

news2024/9/20 6:06:44

es idea maven 依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.11.1</version>
</dependency>


import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.client.{Requests, RestClientBuilder}

import java.time.Duration
import java.util.Properties

object Test {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //需要状态开启下面的配置
    //env.setStateBackend(new RocksDBStateBackend(s"hdfs://${namenodeID}", true))//hdfs 作为状态后端
    //env.enableCheckpointing(10 * 60 * 1000L)
    //env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000L)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间
    val props = new Properties
    props.setProperty("bootstrap.servers", "host:6667") //有些是9092端口
    props.setProperty("group.id", "groupId")
    props.setProperty("retries", "10")
    props.setProperty("retries.backoff.ms", "100")
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000")
    //是否配置了权限,有的话加上下面的配置
    // props.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='' password='';")
    //props.setProperty("security.protocol", "SASL_PLAINTEXT");
    // props.setProperty("sasl.mechanism", "PLAIN")
    val httpHosts = new java.util.ArrayList[HttpHost]
    httpHosts.add(new HttpHost("esIPOne", 9200, "http"))
    httpHosts.add(new HttpHost("esIPTwo", 9200, "http"))
    httpHosts.add(new HttpHost("esIPThree", 9200, "http"))

    val esSinkBuilder = new ElasticsearchSink.Builder[ResultBean](httpHosts, new ElasticsearchSinkFunction[ResultBean] {
      def process(element: ResultBean, ctx: RuntimeContext, indexer: RequestIndexer) {
        val json = new java.util.HashMap[String, Any]
        json.put("@timestamp", element.ts)
        json.put("data", element.data)
        json.put("sum", element.sum)
        val rqst = Requests.indexRequest()
          .index("indexName")
          .id(element.id)
          .source(json)
          .opType(DocWriteRequest.OpType.INDEX)

        indexer.add(rqst)
      }
    })
    setESConf(esSinkBuilder, 5000)
    val myConsumer = new FlinkKafkaConsumer[DemoBean]("topicName", new DemoKafka(), props)
      .setStartFromEarliest() //从什么时间开始读

    val source = env
      .addSource(myConsumer)
      .uid("source-data")
      .name("数据源")
      .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[DemoBean](Duration.ofSeconds(1))
        .withTimestampAssigner(new SerializableTimestampAssigner[DemoBean] {
          override def extractTimestamp(element: DemoBean, recordTimestamp: Long): Long = element.ts
        }).withIdleness(Duration.ofSeconds(5)))
      .uid("water-marks")
      .name("注册水位线")

    source
      .keyBy(k => k.id)
      .process(new DemoProcess())
      .uid("demo-process")
      .name("process 示例")
      .addSink(esSinkBuilder.build())
      .uid("es-sink")
      .name("数据写入es")

    env.execute("任务名")
  }

  private class DemoKafka() extends KafkaDeserializationSchema[DemoBean] {
    override def isEndOfStream(t: DemoBean): Boolean = false

    override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): DemoBean = {
      val value = new String(consumerRecord.value())
      val list = value.split("\t")
      DemoBean(list(0), list(1), list(2).toInt, list(3).toLong)
    }

    override def getProducedType: TypeInformation[DemoBean] = TypeExtractor.getForClass(classOf[DemoBean])
  }

  private class DemoProcess extends KeyedProcessFunction[String, DemoBean, ResultBean] {
    private var hisSumState: ValueState[Int] = _

    override def open(parameters: Configuration): Unit = {
      hisSumState = getRuntimeContext.getState(new ValueStateDescriptor("his-sum", classOf[Int]))
    }

    override def processElement(data: DemoBean, ctx: KeyedProcessFunction[String, DemoBean, ResultBean]#Context, out: Collector[ResultBean]): Unit = {
      val his = if (hisSumState.value() == null) 0 else hisSumState.value()
      val now = data.value
      hisSumState.update(now)
      out.collect(ResultBean(data.id, data.data, his + now, data.value))
    }
  }

  def setESConf[T](esSinkBuilder: ElasticsearchSink.Builder[T], numMaxActions: Int) {
    esSinkBuilder.setBulkFlushMaxActions(numMaxActions)
    esSinkBuilder.setBulkFlushMaxSizeMb(10)
    esSinkBuilder.setBulkFlushInterval(10000)
    esSinkBuilder.setBulkFlushBackoff(true)
    esSinkBuilder.setBulkFlushBackoffDelay(2)
    esSinkBuilder.setBulkFlushBackoffRetries(3)
    esSinkBuilder.setRestClientFactory(new RestClientFactory {
      override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
        restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
          override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {
            requestConfigBuilder.setConnectTimeout(12000)
            requestConfigBuilder.setSocketTimeout(90000)
          }
        })
      }
    })
  }

  private case class DemoBean(id: String, data: String, value: Int, ts: Long)

  private case class ResultBean(id: String, data: String, sum: Int, ts: Long)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2148180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android中的引用类型:Weak Reference, Soft Reference, Phantom Reference 和 WeakHashMap

在Android开发中&#xff0c;内存管理是一个非常重要的话题。为了更好地管理内存&#xff0c;Java和Android提供了多种引用类型&#xff0c;包括Weak Reference、Soft Reference、Phantom Reference以及WeakHashMap。这些引用类型在不同的场景下可以帮助我们更有效地管理内存&a…

(笔记)mac笔记本调节键盘速率

我在使用neovim的时候&#xff0c;发现按下hjkl或者shift[]来进行移动的时候 开始延迟大概几百毫秒的时间才开始移动 所以我上网找了下方法 发现修改这了可以改变速率 我就直接拉到了fast 芜湖 起飞 local opt vim.opt local o vim.o local g vim.go.timeoutlen 100 o…

论文速递!时序预测!DCSDNet:双卷积季节性分解网络,应用于天然气消费预测过程

本期推文将介绍一种新的时序预测方法:双卷积季节性分解网络&#xff08;Dual Convolution withSeasonal Decomposition Network, DCSDNet&#xff09;在天然气消费预测的应用&#xff0c;这项研究发表于《Applied Energy》期刊。 针对天然气消费的多重季节性和非规律性&#x…

汽车焊机数据通信:Profinet转Canopen网关的神奇连接

在汽车制造领域&#xff0c;汽车焊机的高效、稳定运行对于整车质量至关重要。而Profinet转Canopen网关在汽车焊机的数据通信中发挥着关键作用。 Profinet是一种广泛应用于工业自动化领域的通信协议&#xff0c;具有高速、实时、可靠等特点。Canopen则在汽车电子等领域有着广泛…

软件渗透测试流程有哪些?专业软件测评公司简析渗透测试的好处

软件渗透测试是进行软件安全测评的重要环节&#xff0c;旨在通过模拟攻击手段发现软件系统的脆弱性。这种安全测试方法能够帮助开发人员和系统管理员发现并修复潜在的安全漏洞&#xff0c;以确保软件系统的安全性和完整性。软件渗透测试是一项高度技术性的任务&#xff0c;需要…

口哨声、歌声、boing声和biotwang声:用AI识别鲸鱼叫声

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

算法打卡 Day41(动态规划)-理论基础 + 斐波那契数 + 爬楼梯 + 使用最小花费爬楼梯

文章目录 理论基础Leetcode 509-斐波那契数题目描述解题思路 Leetcode 70-爬楼梯题目描述解题思路 Leetcode 746-用最小花费爬楼梯题目描述解题思路 理论基础 动态规划&#xff0c;简称 DP&#xff0c;其中的每一个状态一定是由上一个状态推导出来的&#xff0c;而贪心算法没有…

Mastering Qt 番外 —— 添加源码调试

笔者最近正在尝试深入的学习Qt框架&#xff0c;经常需要明确我经常使用的类底下发生了什么&#xff0c;因此笔者决定仔细研究一下如何进行源码级别的调试 此篇文章将会介绍如何使用Qt Creator这个IDE进行调试。最终效果如下 EasyWay 笔者采用的是这个最简单明了的方式&#xff…

回归预测|基于鹈鹕优化径向基神经网络的数据回归预测Matlab程序POA-RBF 多特征输入单输出 含基础RBF

回归预测|基于鹈鹕优化径向基神经网络的数据回归预测Matlab程序POA-RBF 多特征输入单输出 含基础RBF 文章目录 一、基本原理1. **饥饿游戏搜索优化算法&#xff08;POA&#xff09;简介**2. **径向基神经网络&#xff08;RBF&#xff09;简介**3. **POA-RBF回归预测流程**1. **…

重修设计模式-设计原则

重修设计模式-设计原则 设计原则 设计原则是软件编码时所遵循的规则&#xff0c;旨在帮助开发者创建出既满足功能需求又易于维护、可扩展且美观的设计&#xff0c;理解设计原则可以提升代码质量、减少错误以及促进团队协作&#xff0c;但对设计原则的理解要灵活&#xff0c;不…

前端vue-父传子

父传子的话是在components中创建一个子组件MyTest.vue&#xff0c;并且在父组件中先导入(import MyTest from "./components/MyTest")&#xff0c;再注册&#xff08;在expo二default中写上 compnents:{MyTest}&#xff09;&#xff0c;再使用标签&#xff08;<My…

深度学习后门攻击分析与实现(一)

在计算机安全中&#xff0c;后门攻击是一种恶意软件攻击方式,攻击者通过在系统、应用程序或设备中植入未经授权的访问点,从而绕过正常的身份验证机制,获得对系统的隐蔽访问权限。这种"后门"允许攻击者在不被检测的情况下进入系统,执行各种恶意活动。 后门可以分为几种…

开源项目 GAN 漫画风格化 UGATIT

开源项目&#xff1a;DataBall / UGATIT GitCode * 数据集 * [该项目制作的训练集的数据集下载地址(百度网盘 Password: gxl1 )](https://pan.baidu.com/s/1683TRcv3r3o7jSitq3VyYA) * 预训练模型 * [预训练模型下载地址(百度网盘 Password: khbg )](https://pan.ba…

安卓实现导入Excel文件

使用简化版的jar包 api files(libs/poi-3.12-android-a.jar) api files(libs/poi-ooxml-schemas-3.12-a.jar) 导入遇到了两个兼容问题 1.build.gradle文件里面 android { 要添加 packagingOptions {exclude META-INF/INDEX.LIST } 2.加载大文件要在清单文件里面加androi…

2023年全国研究生数学建模竞赛华为杯B题DFT类矩阵的整数分解逼近求解全过程文档及程序

2023年全国研究生数学建模竞赛华为杯 B题 DFT类矩阵的整数分解逼近 原题再现&#xff1a; 一、问题背景   离散傅里叶变换&#xff08;Discrete Fourier Transform&#xff0c;DFT&#xff09;作为一种基本工具广泛应用于工程、科学以及数学领域。例如&#xff0c;通信信号…

YOLO交通目标识别数据集(红绿灯-汽车-自行车-卡车等)

YOLO交通目标识别 数据集 模型 ui界面 ✓图片数量15000&#xff0c;xml和txt标签都有&#xff1b; ✓class&#xff1a;biker&#xff0c;car&#xff0c;pedestrian&#xff0c;trafficLight&#xff0c;trafficLight-Green&#xff0c;trafficLight-GreenLeft&#xff0c; t…

java se 快速入门

文章目录 java se 快速入门Java 简介Java的优点jdk 和 jre安装jdk配置环境变量Java 语法快速入门程序入口文件名类规范 基本语法注释变量和常量输入输出条件语句循环语句 基本数据类型Java字符串常用方法字符串拼接java字节数组和字符串相互转化java字符数组和字符串相互转换ja…

美畅物联丨技术前沿探索:H.265编码与畅联云平台JS播放器的融合应用

一、H.265 编码&#xff1a;视频压缩技术的重大变革 H.265&#xff0c;即被熟知为高效视频编码&#xff08;HEVC&#xff0c;High Efficiency Video Coding&#xff09;&#xff0c;由国际电信联盟电信标准化部门视频编码专家组&#xff08;ITU-T VCEG&#xff09;与国际标准化…

去噪扩散隐式模型

dataset_name "datasets/oxford-102-flowers/" dataset_repetitions 2 # 数据集重复 num_epochs 25 image_size 64 # 模型训练和生成图像的大小 # KID 内核初始距离 kid_image_size 75 # 从噪声中逐步“去噪”或“扩散”到最终图像所需的步骤数。 kid_diffusi…

计算机毕业设计Python+Flask微博情感分析 微博舆情预测 微博爬虫 微博大数据 舆情分析系统 大数据毕业设计 NLP文本分类 机器学习 深度学习 AI

首先安装需要的python库&#xff0c; 安装完之后利用navicat导入数据库文件bili100.sql到mysql中&#xff0c; 再在pycharm编译器中连接mysql数据库&#xff0c;并在设置文件中将密码修改成你的数据库密码。最后运行app.py&#xff0c;打开链接&#xff0c;即可运行。 B站爬虫数…