Spring 框架下如何调用kafka

news2024/11/17 11:47:15

1、Spring 项目代码结构如下:

2、数据库资源配置文件如下:

#sql配置文件
spring.datasource.driver-class-name=com.microsoft.sqlserver.jdbc.SQLServerDriver
#.19為測試地址,.13為正式地址
spring.datasource.url=jdbc:sqlserver://172.12.100.19:1433;DatabaseName=data
spring.datasource.username=wms1234
spring.datasource.password=wms1234
spring.datasource.hikari.connection-timeout=20000
spring.datasource.hikari.connection-test-query=SELECT 1 
#mybatis配置
mybatis.mapper-locations=classpath:mapper/*.xml
#配置映射的实体类
mybatis.type-aliases-package=com.device.search.dao 

3、Kafaka实现类源码 

package com.device.search.kafka;

import java.io.File;
import java.io.IOException;
import java.security.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;

import com.aliyun.api.internal.mapping.Converter;
import com.aliyun.api.internal.parser.json.JsonConverter;
import com.device.search.dao.AimerBoxlistDao;
import com.device.search.dao.AimerPackboxDao;
import com.device.search.dao.AimerToWms2BDao;
import com.device.search.model.EdiOutErpData;
import com.device.search.model.PackListData;
import com.device.search.service.AimerBoxlistService;
import com.device.search.service.AimerPackboxService;
import com.device.search.service.AimerToWms2BService;
import com.device.search.utils.TimeUtil;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.gson.Gson;
import com.qimen.api.request.DeliveryorderBatchcreateRequest;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.device.search.model.SendData;
public class PackListSendThread implements Runnable {

  private static Logger logger = LoggerFactory.getLogger(PackListSendThread.class);
  private final KafkaProducer<String, String> producer;
  private final String topic;
  public static String packListBack_flag="0";
  private AimerPackboxService aimerPackboxService;
  private AimerBoxlistService aimerBoxlistService;
  private AimerToWms2BService aimerToWms2BService;
  private  final String log_path="d:\\log\\PackListBack\\";
  private  final String err_log="PackListBack_err_";
  private  final String rsp_log="PackListBack_rsp_";
  private  final String req_log="PackListBack_req_";
  public PackListSendThread(String brokers, String topic, AimerPackboxService aimerPackboxService, AimerBoxlistService aimerBoxlistService, AimerToWms2BService aimerToWms2BService)
  {
    Properties prop = createProducerConfig(brokers);
    this.producer = new KafkaProducer<String, String>(prop);
    this.topic = topic;
    this.aimerBoxlistService=aimerBoxlistService;
    this.aimerPackboxService=aimerPackboxService;
    this.aimerToWms2BService=aimerToWms2BService;
  }

  private static Properties createProducerConfig(String brokers) {
    Properties props = new Properties();
    props.put("bootstrap.servers", brokers);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }

  @Override
  public void run() {
    System.out.println("Produces 3 messages");
    while (true) {
       try {
//         if (packListBack_flag == "0") {
//           continue;
//         }
         synchronized (this) {
           if (packListBack_flag == "0") {
             continue;
           }
         }
         List<AimerPackboxDao> listPackbox = aimerPackboxService.findbyid("N");
         for (int i = 0; i < listPackbox.size(); i++)
         {
           String finalBox_no = listPackbox.get(i).getBoxCode();
           String sotype="SC";
           PackListData packListData = new PackListData();

           packListData.setWarehouseID(listPackbox.get(i).getWhsId());
           packListData.setCARTONID(finalBox_no);
           packListData.setConsigneeid(listPackbox.get(i).getCustom());
           packListData.setWaveno(listPackbox.get(i).getTicno());
           packListData.setGrossweight(listPackbox.get(i).getWeight());
           packListData.setCartontype(listPackbox.get(i).getBoxType());
           packListData.setPackno(listPackbox.get(i).getPackPort());
           packListData.setPackwho(listPackbox.get(i).getOperator());

           packListData.setStarttime(listPackbox.get(i).getCreateDate());
           packListData.setEndtime(listPackbox.get(i).getDoneDate()); //20220428
           //2022.04.08 toC箱单增加运单号和承运商
           packListData.setCarrierid(listPackbox.get(i).getCysname());
           packListData.setDeliverno(listPackbox.get(i).getMailno());

           List<AimerBoxlistDao> listBoxlist = aimerBoxlistService.findbyboxno(finalBox_no);
           long shipqty=0;
           if (listBoxlist.size()<=0)//没有箱明细报错
           {
             logger.error("回传箱明细:没有箱明细数据"+finalBox_no);
             aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+"回传箱明细:没有箱明细数据");
             continue;
           }
           String ordno=listBoxlist.get(0).getOrdno();
           List<AimerToWms2BDao> listAimerToWms2BDao = aimerToWms2BService.find2bOrdnoUsr02(ordno,"2");
           if (listAimerToWms2BDao.size()<=0)//没有listAimerToWms2BDao
           {
             logger.error("Aimer_to_wms明细:没有出库数据"+finalBox_no);
             aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+":"+ordno+"Aimer_to_wms明细:没有出库数据");
             continue;
           }

           List<EdiOutErpData> listReturnBoxlist = new ArrayList<EdiOutErpData>();
           for (AimerBoxlistDao aimerBoxlistDao :listBoxlist)
           {
             //20220402  循环查询箱号对应每个单据的操作号,有一个箱号里多个单子。
             List<AimerToWms2BDao> aimerToWms2BDaoList = aimerToWms2BService.find2bOrdnoUsr02(aimerBoxlistDao.getOrdno(),"2");
             if (aimerToWms2BDaoList.size()<=0)//没有listAimerToWms2BDao
             {
               logger.error("Aimer_to_wms明细:没有出库数据11"+finalBox_no);
               aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+":"+aimerBoxlistDao.getOrdno()+"Aimer_to_wms明细1:没有出库数据");
               continue;
             }
              shipqty = (long) (shipqty +  Double.parseDouble(aimerBoxlistDao.getBoxQty()));
              EdiOutErpData ediOutErpData = new EdiOutErpData();
              ediOutErpData.setSku(aimerBoxlistDao.getBarCode());
              ediOutErpData.setQty(aimerBoxlistDao.getBoxQty());
              ediOutErpData.setSoreference1(aimerToWms2BDaoList.get(0).getUsr02());
              //toC出库箱单明细操作好通过KAFKA回传 --2022.04.08
               if(listAimerToWms2BDao.get(0).getOtype().equals("全渠道O2O") || listAimerToWms2BDao.get(0).getOtype().equals("全渠道o2o")){
                 ediOutErpData.setSoreference1(aimerToWms2BDaoList.get(0).getOrdno());
               }
              ediOutErpData.setNotes(aimerToWms2BDaoList.get(0).getOrdno());
              listReturnBoxlist.add(ediOutErpData);
           }
           if (listAimerToWms2BDao.get(0).getUsr01()!=null)//202203029
           {
             if (listAimerToWms2BDao.get(0).getUsr01().equals("B2BCK"))
             {
               sotype = "SC";
             }
             if (listAimerToWms2BDao.get(0).getUsr01().equals("DBCK"))
             {
               sotype = "SC";
             }
             if (listAimerToWms2BDao.get(0).getUsr01().equals("SQDCK"))
             {
               sotype = "OT";
             }
           }
           //toC出库箱单明细通过KAFKA回传 --2022.04.08
           if(listAimerToWms2BDao.get(0).getOtype().equals("全渠道O2O") || listAimerToWms2BDao.get(0).getOtype().equals("全渠道o2o")){
             sotype = "SO";
           }
           packListData.setSOType(sotype);
           packListData.setShippedQty(shipqty);
           packListData.setItemData(listReturnBoxlist);
           SendData sendData = new SendData();
           sendData.setopr_type("add");
           long time = System.currentTimeMillis();
           sendData.settimestamp(Long.toString(time));
           sendData.setPackListData(packListData);

           String msg = new Gson().toJson(sendData);

           File file=new File(log_path+ TimeUtil.convertDay(new Date()));
           if(!file.exists()){//如果文件夹不存在
             file.mkdirs();//创建文件夹
           }
           String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ req_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
           TimeUtil.OutJson(msg,req_filename);

           producer.send(new ProducerRecord<String, String>(topic, msg), new Callback()
           {
             public void onCompletion(RecordMetadata metadata, Exception e)
             {
               String resString = new Gson().toJson(metadata);
               if (e != null)
               {
                 String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ err_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
                 try {
                   TimeUtil.OutJson(resString,req_filename);
                 } catch (IOException ioException) {
                   ioException.printStackTrace();
                 }
                 e.printStackTrace();
               }
               String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ rsp_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
               try {
                 TimeUtil.OutJson(resString,req_filename);
               } catch (IOException ioException) {
                 ioException.printStackTrace();
               }
               System.out.println("Sent:" + msg + ", Partition: " + metadata.partition() + ", Offset: "
                       + metadata.offset());
               aimerPackboxService.update_boxno(finalBox_no);
             }
           });
         }
         // closes producer
         //producer.close();
         Thread.sleep(3000);
       }catch (Exception ex)
       {
         String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ rsp_log+"eRR_" +TimeUtil.convertDate(new Date())+".xml" ;
         try {
           TimeUtil.OutJson(ex.getMessage(),req_filename);
         } catch (IOException e) {
           e.printStackTrace();
         }
         continue;
       }
    }
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/63513.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[ 红队知识库 ] 常见防火墙(WAF)拦截页面

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…

gcc-linaro-7.5.0-2019.12-x86_64_aarch64-linux-gnu交叉编译Arm Linux环境下的身份证读卡器so库操作步骤

1、配置环境变量 ①将gcc-linaro-7.5.0-2019.12-x86_64_aarch64-linux-gnu.tar解压至/home/eastcoms/ sudo或者root运行命令 &#xff1a;sudo tar -xvf gcc-linaro-7.5.0-2019.12-x86_64_aarch64-linux-gnu.tar -C /home/eastcoms .tar用 -xvf .gz用 -zxvf .bz2用 -jxvf …

easypoi导入案例

文章目录easypoi导入案例一、依赖二、导出模板1、excel模板实体类(同下)2、具体实现类3、easypoi工具类中的方法4、自定义样式类三、导入校验1、excel模板实体类2、具体实现类3、自定义信号导入校验类easypoi导入案例 一、依赖 <dependency><groupId>cn.afterturn…

第7 部分 HDLC 和PPP

路由器经常用于构建广域网&#xff0c;广域网链路的封装和以太网上的封装有着非常大的差别。常见的广域网封装有HDLC&#xff0c;PPP 和Frame-relay 等&#xff0c;本次介绍HDLC 和PPP。相对而言&#xff0c;PPP 比HDLC 有较多的功能。 7.1 HDLC 和PPP 简介 7.1.1 HDLC 介绍 H…

批处理及有状态等应用类型在 K8S 上应该如何配置?

众所周知, Kubernetes(K8S)更适合运行无状态应用, 但是除了无状态应用. 我们还会有很多其他应用类型, 如: 有状态应用, 批处理, 监控代理(每台主机上都得跑), 更复杂的应用(如:hadoop 生态...). 那么这些应用可以在 K8S 上运行么? 如何配置? 其实, K8S 针对这些都有对应的不…

操作系统:存储器管理 练习题(带有详细答案解析)

文章目录1.存储器的层次结构2.程序的装入和链接2.1.程序的装入2.2.程序的链接3.连续分配存储管理方式3.1.单一连续分配3.2.固定分区分配3.3.动态分区分配3.4.基于顺序搜索的动态分区分配算法3.5.基于索引搜索的动态分区分配算法3.6.动态可重定位分区分配4.对换4.1.多道程序环境…

SBT 树原理和实战

一 基本概念 SBT&#xff08;Size Balanced Tree&#xff0c;节点大小平衡树&#xff09;是一种自平衡二叉查找树&#xff0c;通过子树的大小来保持平衡。与红黑树、AVL 树等自平衡二叉查找树相比&#xff0c;SBT更易于实现。SBT 可以在 O (logn) 时间内完成所有二叉搜索树的相…

【考研】操作系统复习冲刺(2023年408)

前言 本文内容主要源自于王道讲解的学习笔记总结。梳理《操作系统》考点&#xff08;以理论为重点&#xff09;&#xff0c;并对重点内容划下横线和加粗标注&#xff0c;方便考研复习。 可搭配以下链接一起学习&#xff1a; 【考研复习】《操作系统原理》孟庆昌等编著课后习…

数字IC手撕代码-同步FIFO

前言&#xff1a; 本专栏旨在记录高频笔面试手撕代码题&#xff0c;以备数字前端秋招&#xff0c;本专栏所有文章提供原理分析、代码及波形&#xff0c;所有代码均经过本人验证。 目录如下&#xff1a; 1.数字IC手撕代码-分频器&#xff08;任意偶数分频&#xff09; 2.数字…

磁环选型攻略及EMC整改技巧

磁环选型攻略及EMC整改技巧 今天跟大家分享一下磁环选型及应用相关的知识&#xff0c;希望对你有帮助。 本文将从以下四个方面对磁环进行阐述。 一、磁环的应用场景 首先我们来看几张图片 图1 显示屏VGA线 图2 适配器连接线 图3 USB通信线 这三根线都是我们生活中常见的供电…

简单个人网页设计作业 静态HTML个人博客主页——HTML+CSS+JavaScript 明星鹿晗(7页)

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

ping回显间隔长或第一个包很久才显示是怎么回事?

问题现象 在ping某些域名的时候&#xff0c;第一个回显十几秒才出现&#xff0c;但时延time正常&#xff0c;第二个包开始回显频率正常且最终统计结果为不丢包&#xff1b;或是每一个回显均间隔数秒才显示&#xff0c;但时延time又都是正常的&#xff0c;且统计结果为不丢包。…

U-Net 模型改进和应用场景研究性综述

U-Net综述1 文章介绍2 U-Net介绍3 结构改进4 非结构改进4.1 预处理——数据增强4.2 训练——数据归一化4.3 训练——激活函数4.4 训练——损失函数4.5 结构改进总结5 U-Net应用场景5.1 视网膜血管分割5.2 肺结节分割5.3 肝脏和肝脏肿瘤分割5.4 脑肿瘤分割5.5 不同应用场景总结6…

[附源码]计算机毕业设计基于Springboot校刊投稿系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Vue学习:模板语法

容器里面的模板&#xff1a;对应的模板语法 {{xxx}}:插值语法 指令语法&#xff1a; v-bind&#xff1a;vue指令 绑定 后面的数据会变成属性或者方法 <h1>指令语法</h1><!-- v-bind会将"xxx"里面的内容当成表达式执行 --><a v-bind:href&quo…

这些 MySQL 最朴素的监控方式!用完爱不释手!

对于当前数据库的监控方式有很多&#xff0c;分为数据库自带、商用、开源三大类&#xff0c;每一种都有各自的特色&#xff1b;而对于 mysql 数据库由于其有很高的社区活跃度&#xff0c;监控方式更是多种多样&#xff0c;不管哪种监控方式最核心的就是监控数据&#xff0c;获取…

嵌入式之总线协议:1、UART

嵌入式之总线协议&#xff1a;1、UART 目录 第一章 UART 帧格式讲解 第二章 UART 寄存器讲解 第三章 UART 编程 第四章 输出重定向 第五章 RS232、RS485协议原理与应用 第一章 UART嵌入式之总线协议&#xff1a;1、UART前言一、UART简介1、串行/并行1.1 并行1.2 串行2、异步3、…

C语言第十八课:初阶结构体

目录 前言&#xff1a; 一、结构体类型的声明&#xff1a; 1.结构的基础知识&#xff1a; 2.结构的声明&#xff1a; 3.结构成员允许的类型&#xff1a; 4.结构体变量的定义&#xff1a; 5.结构体变量的初始化&#xff1a; 二、结构体成员的访问&#xff1a; 1.结构体变量访…

[附源码]计算机毕业设计实验室管理系统Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

4 第一个程序

第一个程序 1 源程序 源程序中包括两种指令&#xff1a;伪指令和汇编指令 汇编指令是有对应机器码的指令&#xff0c;可以用CPU直接执行 伪指令没有对应的机器码&#xff0c;只有编译器执行不用CPU执行 1.1 segment ends segment和ends的功能是定义一个段。使用格式如下 …