flink任务处理下线流水数据,数据遗漏不全(三)

news2024/11/14 20:35:03

flink任务处理下线流水数据,数据遗漏不全(二)

居然还是重量,做一个判断,如果是NaN    就直接获取原始的数据的重量

测试后面会不会出现这个情况!


发现chunjun的代码运行不到5h以后,如果网络不稳定,断开mqtt链接以后,就会永远也连接不上了,更短命!!

分析原因:

1、配置mqtt服务器的信息,配置设置选项

2、设置回调函数,当数据来的时候,处理数据;当失去连接的时候,先关闭连接,然后尝试连接数据,但是没有订阅主题,所以我觉得是这个原因

之前的代码为什么有问题

  我觉得是没有设置数据是否连接上就中断连接了,所以失败,其次是主题重复了!!

2023-4-08 21:25分处理

问题:chunjun的的确是不会漏数据了,但是运行不了多久以后mqtt中断了,死活连接不上


2023-04-14日,成功解决这个问题!!!

原因:1、在高并发度的情况下,即使用时间戳来设置clientid也会重复,加一个随机数!

 

2、数据量太大,处理的程序太慢了导致的,增加并行度

 贴上我的代码!

package org.example.mqtt;

import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

/**
 * MQTT客户端订阅消息类
 *
 * @author zhongyulin
 */
@NoArgsConstructor
@Data
public class JsonMqttSource extends RichParallelSourceFunction<JSONObject> {
    //阻塞队列存储订阅的消息
    public static BlockingQueue<JSONObject> queue;
    private String topic;
    private transient MqttClient client;
    public JsonMqttSource(String topic) {
        this.topic = topic;
    }
    public Logger logger = LoggerFactory.getLogger(this.getClass());
    //flink线程启动函数
    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {
        queue = new SynchronousQueue<>(false);
         client=new Connector().connect(topic);
         client.subscribe(topic, 0);
        //利用死循环使得程序一直监控主题是否有新消息
        while (true) {
            //使用阻塞队列的好处是队列空的时候程序会一直阻塞到这里不会浪费CPU资源
            ctx.collect(queue.take());
        }
    }
    @Override
    public void cancel() {

    }
}
package org.example.mqtt;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Connector {
    public  Logger logger = LoggerFactory.getLogger(this.getClass());
    public static  MqttConnectOptions getOptions(){
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setKeepAliveInterval(10);
        options.setMaxReconnectDelay(10);
        options.setUserName("admin");
        options.setPassword("123456".toCharArray());
        return options;
    }


    public static MqttClient connect(String topic) {
        //连接mqtt服务器
        MqttClient client = null;
        for(int i=0;i<20;i++){
            try {
                client = new MqttClient("tcp://ip:1883", "monitor"+System.currentTimeMillis()+ "_"+ new Random().nextLong());
                client.connect(getOptions());
                if(client.isConnected()){
                    client.setCallback(new MqttCallBack(client,topic));
                    break;
                }
            } catch (MqttException e) {
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException interruptedException) {
                    throw new RuntimeException(interruptedException);
                }

            }
        }
        return client;
    }
}

回调函数

package org.example.mqtt;

import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttCallBack implements MqttCallbackExtended {
    private MqttClient client;
    private String topic;
    public Logger logger = LoggerFactory.getLogger(this.getClass());
    public MqttCallBack(MqttClient client,String topic) {
        this.client = client;
        this.topic = topic;
    }

    //连接失败回调该函数
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("进入失败的回调函数......"+throwable.getMessage());
        while(true){
            if(client != null && client.isConnected()){
                try {
                    client.disconnect();
                    client.close();
                } catch (MqttException e) {
                    if(client!=null){
                        try {
                            client.close();
                        } catch (MqttException ex) {
                            ex.printStackTrace();
                        }
                    }
                    logger.error("手动断开连接  "+e.getMessage());
                }
            }
            logger.info("开始连接......");
            client=Connector.connect(topic);
            if (client.isConnected()){
                logger.info("终于连接上了......");
                break;
            }

        }
    }


    //收到消息回调该函数
    @Override
    public void messageArrived(String s, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        try {
            JsonMqttSource.queue.put(JSONObject.parseObject(msg));
        } catch (InterruptedException e) {
            logger.error(e.getMessage() + "\n" + msg);
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }

    @Override
    public void connectComplete(boolean b, String s) {
        try {
            client.subscribe(topic, 0);
            logger.info("订阅主题成功了    "+client.getClientId());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/439976.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EasyCVR平台基于GB28181协议的语音对讲配置操作

EasyCVR基于云边端协同&#xff0c;具有强大的数据接入、处理及分发能力&#xff0c;平台可支持海量视频的轻量化接入与汇聚管理&#xff0c;可提供视频监控直播、视频轮播、视频录像、云存储、回放与检索、智能告警、服务器集群、语音对讲、云台控制、电子地图、平台级联等功能…

IT行业里的热门技术

© Ptw-cwl 文章目录 1. 人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09;2. 大数据3. Web2.04. 移动应用程序开发5. 物联网6. 云计算7. 区块链8. 5G技术9. 虚拟现实&#xff08;VR&#xff09;和增强现实&#xff08;AR&#xff09; 现在如果问…

如何使用Midjourney辅助建筑设计,常用的提示和使用效果展示(内附Midjourney提示词网站)

文章目录 一.Midjourney建筑设计的提示技巧1. prompt模板12.prompt模板2 二、著名建筑师为例1.Zaha Hadid&#xff08;扎哈哈迪德&#xff09;2.Ludwig Mies van der Rohe&#xff08;路德维希密斯凡德罗&#xff09;3.Renzo Piano&#xff08;皮亚诺&#xff09;4.Stefano Boe…

PEIS源码,体检管理系统源码,C#医院体检系统源码

PEIS体检管理系统源码&#xff0c;医院体检系统源码PEIS源码&#xff0c;商业级源码&#xff0c;有演示。 PEIS医院体检管理系统采用C/S结构&#xff0c;前台开发工具为Vs2012&#xff0c;后台数据库采用oracle大型数据库。核心功能有&#xff1a;体检档案的录入、体检报告的输…

从入门到精通:SEO站外优化全面解析

通过​​第三章​​上下连续两期的干货内容&#xff0c;相信你已经掌握了 SEO 优化的基本方法&#xff0c;但你有没有发现&#xff1a;之前的内容都是针对网站本身进行优化的方法&#xff0c;其实&#xff0c;SEO 还有相当一部分功夫要放在站外优化上。 问题来了&#xff1a;站…

上海震坤行工业超市聚焦量具量仪市场,助力企业实现测量数字化

上海震坤行工业超市聚焦量具量仪市场&#xff0c;助力企业实现测量数字化 近日&#xff0c;量具量仪三大品牌哈量、广陆、英示的三位重磅嘉宾及震坤行磨具量具产线总经理&#xff0c;走进震坤行工业超市直播间。带来了一场“聚焦量具量仪市场&#xff0c;助力企业实现测量数字…

C语言中结构体(struct)的详细分解与使用

目录 第一&#xff1a;结构体的定义 第二&#xff1a;规则 第三&#xff1a;结构体声明 第四&#xff1a;C 语言结构体定义的三种方式 第五&#xff1a;对于结构体变量的初始化 第六&#xff1a;整体与分开 第七&#xff1a;结构体长度 第八&#xff1a;嵌入式开发中&am…

第 三 章 UML 类图

文章目录 前言一、依赖关系&#xff08;虚线箭头&#xff09;二、泛化关系&#xff1a;继承&#xff08;实线空心箭头&#xff09;三、实现关系&#xff08;虚线空心箭头&#xff09;四、关联关系&#xff08;一对一为实线箭头&#xff0c;一对多为实线&#xff09;五、聚合关系…

如何实现24小时客户服务

许多企业都有着这样的愿望&#xff1a;在不增加客服人员的同时能实现24小时客户服务。 那么有没有什么方法可以实现这一想法呢&#xff1f;在想解决方案之前我们可以先来谈谈客服的作用。 客服的作用主要为以下2点&#xff1a; 帮助用户更快地了解产品&#xff08;减轻产品的…

Linux常用基础指令大全

在使用Aistudio平台学习PaddlePaddle时&#xff0c;常常会用到linux指令&#xff0c;对于之前没有学习过linux指令的我来说&#xff0c;确实有点难度&#xff0c;在学习了一段时间之后&#xff0c;慢慢也适应了linux指令&#xff0c;这一篇博客主要记录linux中的常用指令。以AI…

typescipt的运行环境搭建

1.安装node 官网地址&#xff1a;https://nodejs.org/en 2.安装完成后&#xff0c;运行node -v 检测安装版本 3.全局安装typescipt npm i typescipt -g 用tsc检测是否安装好 创建一个demo.ts文件 在终端运行tsc demo.ts会出现错误 找到windows powershell以管理员身份运行…

算法训练day2:哈希表

哈希表理论基础 哈希表是根据关键码的值而直接进行访问的数据结构。 当我们遇到了要快速判断一个元素是否出现集合里的时候&#xff0c;就要考虑哈希法。 但是哈希法也是牺牲了空间换取了时间&#xff0c;因为我们要使用额外的数组&#xff0c;set或者是map来存放数据&#…

LeetCode:454. 四数相加 II —— 哈希表为什么叫哈希表~

&#x1f34e;道阻且长&#xff0c;行则将至。&#x1f353; &#x1f33b;算法&#xff0c;不如说它是一种思考方式&#x1f340; 算法专栏&#xff1a; &#x1f449;&#x1f3fb;123 hash是什么&#xff0c;哈希表为什么叫哈希表&#xff1f; 一、&#x1f331;454. 四数…

【hello Linux】进程概念(下)

目录 1. 通过系统调用创建进程—fork 1.1 通过fork创建进程&#xff1a; 1.2 如何不退出 vim 直接执行命令呢&#xff1f; 3. fork创建进程的本质 4. 父子进程的分流&#xff1a; 2. 进程状态 3. 信号 3.1 显示全部信号 3.1 停止进程 3.2 继续进程 3.3 杀死进程 后台进程 4. 僵…

在pycharm2020上部署配置AutoGPT4.0,保姆级教程

前期环境及软件准备&#xff1a; 1&#xff09;pycharm版本2020及以上 2&#xff09;python版本3.10及以上 3&#xff09;pip版本20及以上&#xff08;新一点的版本&#xff09; 4&#xff09;安装git&#xff0c;无版本要求 正式开工 具体配置步骤如下&#xff1a; 1.AUTOGP…

【UE 粒子系统】使用GPU渲染粒子

GPU Sprite 是虚幻引擎4中可用的粒子类型之一。这些粒子首先在CPU上生成&#xff0c;但之后完全由显卡处理和计算。这样做的好处是&#xff0c;由于GPU负责处理计算&#xff0c;因此可以同时处理成千上万的粒子&#xff0c;从而实现更密集和更细节化的粒子系统。 在上一篇博客&…

PPC380AE102 HIEE300885R0102现代自动化技术

PPC380AE102 HIEE300885R0102现代自动化技术 ​ 交流伺服电机驱动器示例 目前世界人口已经达到了78亿&#xff0c;并且还在不断增加&#xff0c;预计到2050年将达到100亿。日益增长的人口既有对衣服&#xff0c;食物等基本必需品的需求&#xff0c;对舒适、安全生活的追求也不断…

C++并发数据结构设计

关键词&#xff1a;原子操作&#xff0c;无锁设计 引入问题-> 为什么需要原子操作-> 原子操作实现以及原理-> c原子操作接口-> c基于原子操作的数据结构设计-> 原子操作 什么是原子操作 所谓原子操作,就是"不可中断的一个或一系列操作" 。 2…

列表和元组(上)——“Python”

各位CSDN的uu们你们好呀&#xff0c;今天小雅兰的内容是Python中的列表&#xff0c;下面&#xff0c;让我们进入列表的世界吧 列表是什么, 元组是什么 创建列表 访问下标 切片操作 遍历列表元素 列表是什么, 元组是什么 编程中, 经常需要使用变量, 来保存/表示数据. 如果…

Swift 注释和文档

今天&#xff0c;我知道我写是什么&#xff0c;上帝和我知道 明天&#xff0c;我知道这个代码什么意思&#xff0c; 后天&#xff0c;我知道这是我写的代码&#xff0c; 一周后&#xff0c;这TM谁写的代码&#xff0c;此时只有上帝才知道啥意思 论代码注释的重要性。 普通…