flink任务处理下线流水数据,数据遗漏不全(二)
居然还是重量,做一个判断,如果是NaN 就直接获取原始的数据的重量
测试后面会不会出现这个情况!
发现chunjun的代码运行不到5h以后,如果网络不稳定,断开mqtt链接以后,就会永远也连接不上了,更短命!!
分析原因:
1、配置mqtt服务器的信息,配置设置选项
2、设置回调函数,当数据来的时候,处理数据;当失去连接的时候,先关闭连接,然后尝试连接数据,但是没有订阅主题,所以我觉得是这个原因
之前的代码为什么有问题
我觉得是没有设置数据是否连接上就中断连接了,所以失败,其次是主题重复了!!
2023-4-08 21:25分处理
问题:chunjun的的确是不会漏数据了,但是运行不了多久以后mqtt中断了,死活连接不上
2023-04-14日,成功解决这个问题!!!
原因:1、在高并发度的情况下,即使用时间戳来设置clientid也会重复,加一个随机数!
2、数据量太大,处理的程序太慢了导致的,增加并行度
贴上我的代码!
package org.example.mqtt;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
/**
* MQTT客户端订阅消息类
*
* @author zhongyulin
*/
@NoArgsConstructor
@Data
public class JsonMqttSource extends RichParallelSourceFunction<JSONObject> {
//阻塞队列存储订阅的消息
public static BlockingQueue<JSONObject> queue;
private String topic;
private transient MqttClient client;
public JsonMqttSource(String topic) {
this.topic = topic;
}
public Logger logger = LoggerFactory.getLogger(this.getClass());
//flink线程启动函数
@Override
public void run(SourceContext<JSONObject> ctx) throws Exception {
queue = new SynchronousQueue<>(false);
client=new Connector().connect(topic);
client.subscribe(topic, 0);
//利用死循环使得程序一直监控主题是否有新消息
while (true) {
//使用阻塞队列的好处是队列空的时候程序会一直阻塞到这里不会浪费CPU资源
ctx.collect(queue.take());
}
}
@Override
public void cancel() {
}
}
package org.example.mqtt;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class Connector {
public Logger logger = LoggerFactory.getLogger(this.getClass());
public static MqttConnectOptions getOptions(){
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setKeepAliveInterval(10);
options.setMaxReconnectDelay(10);
options.setUserName("admin");
options.setPassword("123456".toCharArray());
return options;
}
public static MqttClient connect(String topic) {
//连接mqtt服务器
MqttClient client = null;
for(int i=0;i<20;i++){
try {
client = new MqttClient("tcp://ip:1883", "monitor"+System.currentTimeMillis()+ "_"+ new Random().nextLong());
client.connect(getOptions());
if(client.isConnected()){
client.setCallback(new MqttCallBack(client,topic));
break;
}
} catch (MqttException e) {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(interruptedException);
}
}
}
return client;
}
}
回调函数
package org.example.mqtt;
import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttCallBack implements MqttCallbackExtended {
private MqttClient client;
private String topic;
public Logger logger = LoggerFactory.getLogger(this.getClass());
public MqttCallBack(MqttClient client,String topic) {
this.client = client;
this.topic = topic;
}
//连接失败回调该函数
@Override
public void connectionLost(Throwable throwable) {
logger.info("进入失败的回调函数......"+throwable.getMessage());
while(true){
if(client != null && client.isConnected()){
try {
client.disconnect();
client.close();
} catch (MqttException e) {
if(client!=null){
try {
client.close();
} catch (MqttException ex) {
ex.printStackTrace();
}
}
logger.error("手动断开连接 "+e.getMessage());
}
}
logger.info("开始连接......");
client=Connector.connect(topic);
if (client.isConnected()){
logger.info("终于连接上了......");
break;
}
}
}
//收到消息回调该函数
@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
try {
JsonMqttSource.queue.put(JSONObject.parseObject(msg));
} catch (InterruptedException e) {
logger.error(e.getMessage() + "\n" + msg);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
@Override
public void connectComplete(boolean b, String s) {
try {
client.subscribe(topic, 0);
logger.info("订阅主题成功了 "+client.getClientId());
} catch (MqttException e) {
e.printStackTrace();
}
}
}