1、Spring 项目代码结构如下:
2、数据库资源配置文件如下:
#sql配置文件
spring.datasource.driver-class-name=com.microsoft.sqlserver.jdbc.SQLServerDriver
#.19為測試地址,.13為正式地址
spring.datasource.url=jdbc:sqlserver://172.12.100.19:1433;DatabaseName=data
spring.datasource.username=wms1234
spring.datasource.password=wms1234
spring.datasource.hikari.connection-timeout=20000
spring.datasource.hikari.connection-test-query=SELECT 1
#mybatis配置
mybatis.mapper-locations=classpath:mapper/*.xml
#配置映射的实体类
mybatis.type-aliases-package=com.device.search.dao
3、Kafaka实现类源码
package com.device.search.kafka;
import java.io.File;
import java.io.IOException;
import java.security.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import com.aliyun.api.internal.mapping.Converter;
import com.aliyun.api.internal.parser.json.JsonConverter;
import com.device.search.dao.AimerBoxlistDao;
import com.device.search.dao.AimerPackboxDao;
import com.device.search.dao.AimerToWms2BDao;
import com.device.search.model.EdiOutErpData;
import com.device.search.model.PackListData;
import com.device.search.service.AimerBoxlistService;
import com.device.search.service.AimerPackboxService;
import com.device.search.service.AimerToWms2BService;
import com.device.search.utils.TimeUtil;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.gson.Gson;
import com.qimen.api.request.DeliveryorderBatchcreateRequest;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.device.search.model.SendData;
public class PackListSendThread implements Runnable {
private static Logger logger = LoggerFactory.getLogger(PackListSendThread.class);
private final KafkaProducer<String, String> producer;
private final String topic;
public static String packListBack_flag="0";
private AimerPackboxService aimerPackboxService;
private AimerBoxlistService aimerBoxlistService;
private AimerToWms2BService aimerToWms2BService;
private final String log_path="d:\\log\\PackListBack\\";
private final String err_log="PackListBack_err_";
private final String rsp_log="PackListBack_rsp_";
private final String req_log="PackListBack_req_";
public PackListSendThread(String brokers, String topic, AimerPackboxService aimerPackboxService, AimerBoxlistService aimerBoxlistService, AimerToWms2BService aimerToWms2BService)
{
Properties prop = createProducerConfig(brokers);
this.producer = new KafkaProducer<String, String>(prop);
this.topic = topic;
this.aimerBoxlistService=aimerBoxlistService;
this.aimerPackboxService=aimerPackboxService;
this.aimerToWms2BService=aimerToWms2BService;
}
private static Properties createProducerConfig(String brokers) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
@Override
public void run() {
System.out.println("Produces 3 messages");
while (true) {
try {
// if (packListBack_flag == "0") {
// continue;
// }
synchronized (this) {
if (packListBack_flag == "0") {
continue;
}
}
List<AimerPackboxDao> listPackbox = aimerPackboxService.findbyid("N");
for (int i = 0; i < listPackbox.size(); i++)
{
String finalBox_no = listPackbox.get(i).getBoxCode();
String sotype="SC";
PackListData packListData = new PackListData();
packListData.setWarehouseID(listPackbox.get(i).getWhsId());
packListData.setCARTONID(finalBox_no);
packListData.setConsigneeid(listPackbox.get(i).getCustom());
packListData.setWaveno(listPackbox.get(i).getTicno());
packListData.setGrossweight(listPackbox.get(i).getWeight());
packListData.setCartontype(listPackbox.get(i).getBoxType());
packListData.setPackno(listPackbox.get(i).getPackPort());
packListData.setPackwho(listPackbox.get(i).getOperator());
packListData.setStarttime(listPackbox.get(i).getCreateDate());
packListData.setEndtime(listPackbox.get(i).getDoneDate()); //20220428
//2022.04.08 toC箱单增加运单号和承运商
packListData.setCarrierid(listPackbox.get(i).getCysname());
packListData.setDeliverno(listPackbox.get(i).getMailno());
List<AimerBoxlistDao> listBoxlist = aimerBoxlistService.findbyboxno(finalBox_no);
long shipqty=0;
if (listBoxlist.size()<=0)//没有箱明细报错
{
logger.error("回传箱明细:没有箱明细数据"+finalBox_no);
aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+"回传箱明细:没有箱明细数据");
continue;
}
String ordno=listBoxlist.get(0).getOrdno();
List<AimerToWms2BDao> listAimerToWms2BDao = aimerToWms2BService.find2bOrdnoUsr02(ordno,"2");
if (listAimerToWms2BDao.size()<=0)//没有listAimerToWms2BDao
{
logger.error("Aimer_to_wms明细:没有出库数据"+finalBox_no);
aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+":"+ordno+"Aimer_to_wms明细:没有出库数据");
continue;
}
List<EdiOutErpData> listReturnBoxlist = new ArrayList<EdiOutErpData>();
for (AimerBoxlistDao aimerBoxlistDao :listBoxlist)
{
//20220402 循环查询箱号对应每个单据的操作号,有一个箱号里多个单子。
List<AimerToWms2BDao> aimerToWms2BDaoList = aimerToWms2BService.find2bOrdnoUsr02(aimerBoxlistDao.getOrdno(),"2");
if (aimerToWms2BDaoList.size()<=0)//没有listAimerToWms2BDao
{
logger.error("Aimer_to_wms明细:没有出库数据11"+finalBox_no);
aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+":"+aimerBoxlistDao.getOrdno()+"Aimer_to_wms明细1:没有出库数据");
continue;
}
shipqty = (long) (shipqty + Double.parseDouble(aimerBoxlistDao.getBoxQty()));
EdiOutErpData ediOutErpData = new EdiOutErpData();
ediOutErpData.setSku(aimerBoxlistDao.getBarCode());
ediOutErpData.setQty(aimerBoxlistDao.getBoxQty());
ediOutErpData.setSoreference1(aimerToWms2BDaoList.get(0).getUsr02());
//toC出库箱单明细操作好通过KAFKA回传 --2022.04.08
if(listAimerToWms2BDao.get(0).getOtype().equals("全渠道O2O") || listAimerToWms2BDao.get(0).getOtype().equals("全渠道o2o")){
ediOutErpData.setSoreference1(aimerToWms2BDaoList.get(0).getOrdno());
}
ediOutErpData.setNotes(aimerToWms2BDaoList.get(0).getOrdno());
listReturnBoxlist.add(ediOutErpData);
}
if (listAimerToWms2BDao.get(0).getUsr01()!=null)//202203029
{
if (listAimerToWms2BDao.get(0).getUsr01().equals("B2BCK"))
{
sotype = "SC";
}
if (listAimerToWms2BDao.get(0).getUsr01().equals("DBCK"))
{
sotype = "SC";
}
if (listAimerToWms2BDao.get(0).getUsr01().equals("SQDCK"))
{
sotype = "OT";
}
}
//toC出库箱单明细通过KAFKA回传 --2022.04.08
if(listAimerToWms2BDao.get(0).getOtype().equals("全渠道O2O") || listAimerToWms2BDao.get(0).getOtype().equals("全渠道o2o")){
sotype = "SO";
}
packListData.setSOType(sotype);
packListData.setShippedQty(shipqty);
packListData.setItemData(listReturnBoxlist);
SendData sendData = new SendData();
sendData.setopr_type("add");
long time = System.currentTimeMillis();
sendData.settimestamp(Long.toString(time));
sendData.setPackListData(packListData);
String msg = new Gson().toJson(sendData);
File file=new File(log_path+ TimeUtil.convertDay(new Date()));
if(!file.exists()){//如果文件夹不存在
file.mkdirs();//创建文件夹
}
String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ req_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
TimeUtil.OutJson(msg,req_filename);
producer.send(new ProducerRecord<String, String>(topic, msg), new Callback()
{
public void onCompletion(RecordMetadata metadata, Exception e)
{
String resString = new Gson().toJson(metadata);
if (e != null)
{
String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ err_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
try {
TimeUtil.OutJson(resString,req_filename);
} catch (IOException ioException) {
ioException.printStackTrace();
}
e.printStackTrace();
}
String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ rsp_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
try {
TimeUtil.OutJson(resString,req_filename);
} catch (IOException ioException) {
ioException.printStackTrace();
}
System.out.println("Sent:" + msg + ", Partition: " + metadata.partition() + ", Offset: "
+ metadata.offset());
aimerPackboxService.update_boxno(finalBox_no);
}
});
}
// closes producer
//producer.close();
Thread.sleep(3000);
}catch (Exception ex)
{
String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ rsp_log+"eRR_" +TimeUtil.convertDate(new Date())+".xml" ;
try {
TimeUtil.OutJson(ex.getMessage(),req_filename);
} catch (IOException e) {
e.printStackTrace();
}
continue;
}
}
}
}