前言
小编我将用CSDN记录软件开发求学之路上亲身所得与所学的心得与知识,有兴趣的小伙伴可以关注一下!
也许一个人独行,可以走的很快,但是一群人结伴而行,才能走的更远!让我们在成长的道路上互相学习,让我们共同进步,欢迎关注!
针对websocket技术的金融alltick股票实战经验,通过调用第三方wss的的数据,来获取实时数据,并保持性能高及效率高
1、在springboot中引入websocket相应的jar包
<!-- Spring Boot WebSocket Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.创建webSocketConfig 暴露endpoint端点
package com.nq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3:创建websocket客户端用于连接第三方的wss
package com.nq.common;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nq.pojo.Stock;
import com.nq.service.IStockService;
import com.nq.utils.PropertiesUtil;
import com.nq.utils.StringUtils;
import com.nq.utils.redis.RedisShardedPool;
import com.nq.utils.redis.RedisShardedPoolUtils;
import com.nq.vo.stock.StockListVO;
import com.nq.vo.websocket.CodeVo;
import com.nq.vo.websocket.StockVo;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.hpsf.Decimal;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.websocket.*;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @Description: websocket客户端
* 一共2800条code产品数据,每个webSocketServer处理1000条数据,分三个webSocketServer处理
* 提高效率
* @Author: jade
* @Date: 2021/8/25 10:25
*/
@ClientEndpoint
@Slf4j
@Component //交给spring容器管理
@Data
public class WebSocketJavaExample {
private Session session; //会话对象
private Boolean flag = true; //用来拯救流的关闭标识符
private Map<String,StockVo> stockVoMap;
private List<StockVo> stockList; //返回给客户端的封装数据List集合
public final static Integer MAXCAP = 1000; //一次性以1000条
@Resource //使用@Resource来装配,不然会为null
IStockService stockService;
private ObjectMapper objectMapper=new ObjectMapper();
@OnOpen
public void onOpen(Session session) {
this.session = session;
}
/**
接收第三方服务端的消息
**/
@OnMessage
public void onMessage(String message) {
if(message.indexOf("data") != -1) {
try {
JSONObject jsonObject = JSONUtil.parseObj(message);
String dataStr = jsonObject.getStr("data");//第三方响应的Json数据
if (dataStr != null) {
// JSONArray jsonArray = JSONUtil.parseArray(dataStr);
// JSONObject jsonObject = JSONUtil.parseObj(dataStr);
// jsonArray.stream().forEach(item -> {
JSONObject json = JSONUtil.parseObj(dataStr);
Optional<StockVo> stockVo = stockList.stream()//Optional为java8的Stream API中使用,处理可能为null的元素
.filter(p -> json.getStr("code").equals(p.getCode().concat(".US"))).findFirst();
// .filter(p -> json.getStr("code").equals(p.getCode())).findFirst();
stockVo.ifPresent(vo -> {
// 当前价格
BigDecimal nowPrice = new BigDecimal(json.getStr("price"));
BigDecimal preClosePrice = vo.getPreclose_px();
vo.setType(json.getStr("trade_direction"));
// alltick websocket 获取数据 替换原来的当前价格和涨幅
vo.setNowPrice(nowPrice);
// 计算涨幅
BigDecimal chg = nowPrice.subtract(preClosePrice).divide(preClosePrice, 4, BigDecimal.ROUND_HALF_UP).multiply(new BigDecimal(100));
vo.setHcrate(chg);
});
log.info("Optional<StockVo> send message to client"+stockVo);
// });
} else {
log.error("data字段不是一个有效的JSON数组: {}", dataStr);
}
} catch (Exception e) {
log.error("解析消息时发生异常: {}", e.getMessage());
}
}
}
@OnClose
public void onClose(Session session, CloseReason closeReason) {
flag=false;
log.info("AllTick API Docs流已关闭,关闭原因:{}",closeReason.toString());
}
@OnError
public void onError(Throwable e) {
log.error("AllTick API Docs连接异常{}",e.getMessage());
}
@Async
public void sendMessage(String key,String message) throws Exception {
session.getBasicRemote().sendText(message);
// log.info("client:{}, AllTick API Docs 请求报文: {}", key, message);
// if (this.session != null && this.session.isOpen()) {
// this.session.getBasicRemote().sendText(message);
// } else {
// log.error("会话已关闭,无法发送消息: {}", key);
// }
}
//websocket地址
private String url=PropertiesUtil.getProperty("WebSocket.url");
//token数据
private String token= PropertiesUtil.getProperty("WebSocket.token");
public static List<WebSocketJavaExampleInfo> webSocketJavaExampleList = new ArrayList<>();
@PostConstruct
public void initPool() {
new Thread(()->{ //另外起一个线程执行websocket,不影响主线程
run();
}).start();
}
@PreDestroy
public void destroy() {
if (this.session != null && this.session.isOpen()) {
try {
this.session.close();
} catch (IOException e) {
log.error("关闭WebSocket连接时发生异常: {}", e.getMessage());
}
}
}
@Async
public void run(){
try {
List<Stock> list = stockService.findStockList();
int len = list.size();
int capacity = (int) Math.ceil((double) len / MAXCAP);//向上取整
// int capacity = (int) Math.ceil(len / MAXCAP);
// if (capacity<1 || len % capacity != 0 ) {
// capacity++;
// }
List<CodeVo> codeVos = new ArrayList<>();
log.info("开始连接AllTick API Docs,请求url:{}",url.concat(token));
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
URI uri = new URI(url.concat(token)); // Replace with your websocket endpoint URL
for (int i = 0; i < capacity; i++) {
WebSocketJavaExample client = new WebSocketJavaExample(); //多个客户client执行,每个客户端执行1000条数据
container.connectToServer(client, uri);
List<Stock> list1 = list.stream().skip(i * MAXCAP).limit(MAXCAP).collect(Collectors.toList());
stockList = new ArrayList<>();
list1.forEach(item -> {
CodeVo codeVo = new CodeVo();
codeVo.setCode(item.getStockCode().concat(".US"));
// codeVo.setCode(item.getStockCode());
codeVos.add(codeVo);
StockVo stockVo = new StockVo();
try {
// 数据初始化
String us = RedisShardedPoolUtils.get(item.getStockGid(), 4);
StockListVO stockListVO = objectMapper.readValue(us, StockListVO.class);
stockVo.setName(stockListVO.getName());
stockVo.setCode(stockListVO.getCode());
stockVo.setGid(stockListVO.getGid());
stockVo.setStock_type(stockListVO.getStock_type());
stockVo.setType(stockListVO.getType());
stockVo.setHcrate(stockListVO.getHcrate());
stockVo.setOpen_px(new BigDecimal(stockListVO.getOpen_px()));
stockVo.setNowPrice(new BigDecimal(stockListVO.getNowPrice()));
stockVo.setPreclose_px(new BigDecimal(stockListVO.getPreclose_px()));
stockVo.setIsOption(Integer.valueOf(stockListVO.getIsOption()));
stockList.add(stockVo);
} catch (JsonProcessingException e) {
log.info("redis数据转换对象stockListVO异常",e.getMessage());
}
}
);
JSONArray symbolList = new JSONArray(codeVos); // 直接将List转换为JSONArray
client.setStockList(stockList);
// 使用LinkedHashMap来保持顺序
Map<String, Object> messageMap = new LinkedHashMap<>();
messageMap.put("cmd_id", 22004);
messageMap.put("seq_id", 123);
messageMap.put("trace", "3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806");
Map<String, Object> dataMap = new LinkedHashMap<>();
dataMap.put("symbol_list", symbolList);
messageMap.put("data", dataMap);
// 将LinkedHashMap转换为JSONObject
JSONObject message2 = new JSONObject(messageMap);
String message = message2.toString();
// String message = "{\"cmd_id\":22004,\"seq_id\":123,\"trace\":\"3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806\",\"data\":{\"symbol_list\": "+ JSONUtil.toJsonStr(codeVos) +"}}";
client.sendMessage("client" + i, message);
webSocketJavaExampleList.add(new WebSocketJavaExampleInfo(client, "client" + i,message));
codeVos.clear();
// 创建一个TimerTask任务
int finalI = i;
TimerTask task3 = new TimerTask() {
@SneakyThrows
@Override
public void run() {
//定时获取心跳
try {
client.sendMessage("client" + finalI,"{\n" +
" \"cmd_id\":22000,\n" +
" \"seq_id\":123,\n" +
" \"trace\":\"3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806\",\n" +
" \"data\":{\n" +
" }\n" +
"}");
} catch (Exception e) {
log.error(e.getMessage());
}
}
};
new Timer().schedule(task3, 10000,10000);
new Thread().sleep(2000);
}
}catch (Exception e){
e.printStackTrace();
log.error("AllTick API Docs 连接失败:{}",e.getMessage());
}
}
/**
定时任务,应用启动后延迟 2.5 分钟开始执行,之后每隔 2.5 分钟执行一次,去勘测是否流关闭,然后拯救连接websocket
**/
@Scheduled(fixedRate = 1 * 15 * 10000,initialDelay = 150000)
public void Daemon() throws Exception {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
URI uri = new URI(url.concat(token)); // Replace with your websocket endpoint URL
for(WebSocketJavaExampleInfo webSocketJavaExampleInfo:webSocketJavaExampleList){
if(!webSocketJavaExampleInfo.getClient().flag){
container.connectToServer(webSocketJavaExampleInfo.getClient(), uri);
webSocketJavaExampleInfo.getClient().sendMessage(webSocketJavaExampleInfo.getKey(), webSocketJavaExampleInfo.getMessage());
}
}
}
}
@Data
class WebSocketJavaExampleInfo{
private WebSocketJavaExample client;
private String key;
private String message;
public WebSocketJavaExampleInfo(WebSocketJavaExample client, String key,String message) {
this.client = client;
this.key = key;
this.message = message;
}
}
4、创建websocket服务端用于连接客户端,及供前端访问
package com.nq.common;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.nq.vo.websocket.StockVo;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.nq.common.WebSocketJavaExample.MAXCAP;
import static com.nq.common.WebSocketJavaExample.webSocketJavaExampleList;
@ServerEndpoint("/ws/{userId}")
@EnableWebSocket
@Component
@Data
public class WebSocketServer {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
private static final Integer pageSize = 100;//页码
private Integer pageNo;//页数
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userId = "";
/**
* 查询code
*/
private String code = "";
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
webSocketMap.put(userId, this);
} else {
webSocketMap.put(userId, this);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户消息:" + userId + ",报文:" + message);
JSONObject jsonObject = JSONUtil.parseObj(message);
// pageSize = jsonObject.getInt("pageSize");
pageNo = jsonObject.getInt("pageNo");
code = jsonObject.getStr("code");
// if (ObjectUtil.isNotEmpty(code)) { //如果code不为空,则查询并推送数据
// queryAndSendStockVo(code, session);
// }
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发消息
*
* @throws IOException
*/
@PostConstruct
public void BroadCastInfo() throws IOException, InterruptedException {
log.info("开始轮询批量推送数据");
ThreadUtil.execAsync(() -> {
run();
});
}
public void run() {
WebSocketServer webSocketServer;
List<StockVo> list = null;
List<StockVo> additionalList = null;
List<StockVo> list2 = null;
while (true) {
try {
if (webSocketMap.size() > 0) {
for (Map.Entry<String, WebSocketServer> stringWebSocketServerEntry : webSocketMap.entrySet()) {
webSocketServer = stringWebSocketServerEntry.getValue();
if (ObjectUtil.isEmpty(webSocketServer.pageNo) && ObjectUtil.isNotEmpty(webSocketServer.pageSize) && ObjectUtil.isEmpty(webSocketServer.getCode())) {//如果默认没有参数 就传输3千条
// if(ObjectUtil.isEmpty(webSocketServer.pageNo)) {//如果默认没有参数 就传输3千条
list = webSocketJavaExampleList.get(0).getClient().getStockList().stream().limit(1000).collect(Collectors.toList());
} else if (ObjectUtil.isNotEmpty(webSocketServer.pageNo)) {
int pageSize = webSocketServer.pageSize;
int pageNo = webSocketServer.pageNo;
Integer size = pageNo * pageSize;
// int capacity = (int) Math.ceil(size / 20);
int capacity = (int) Math.ceil(size / MAXCAP);
int pageno = (capacity * 1000 / pageSize);
pageNo -= pageno;
if (capacity == 0) {
list = webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip(0).limit(pageNo * pageSize).collect(Collectors.toList());
}
if (capacity == 1) {
list = webSocketJavaExampleList.get(0).getClient().getStockList().stream().collect(Collectors.toList());
additionalList = webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip(0).limit(size - (MAXCAP * capacity)).collect(Collectors.toList());
list = Stream.concat(list.stream(), additionalList.stream()).collect(Collectors.toList());
}
if (capacity == 2) {
list = webSocketJavaExampleList.get(0).getClient().getStockList().stream().collect(Collectors.toList());
list2 = webSocketJavaExampleList.get(1).getClient().getStockList().stream().collect(Collectors.toList());
additionalList = webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip(0).limit(size - (MAXCAP * capacity)).collect(Collectors.toList());
list = Stream.concat(Stream.concat(list.stream(), list2.stream()), additionalList.stream())
.collect(Collectors.toList());
}
// list = webSocketJavaExampleList.get(capacity).getClient().getStockList().stream().skip((pageNo - 1) * pageSize).limit(pageSize).collect(Collectors.toList());
} else {
String queryCode = webSocketServer.getCode();
// 使用并行流处理数据,减少嵌套循环
list = webSocketJavaExampleList.parallelStream()
.flatMap(webSocketJavaExampleInfo -> webSocketJavaExampleInfo.getClient().getStockList().stream())
.filter(stockVo -> stockVo.getCode().contains(queryCode))
.collect(Collectors.toList());
}
try {
stringWebSocketServerEntry.getValue().sendMessage(JSONUtil.toJsonStr(list));
} catch (IOException e) {
log.error("用户编码为:{},推送ws数据异常,异常原因:{}", webSocketServer.getUserId(), e.getMessage());
}
}
}
} catch (Exception e) {
log.error("推送失败: {}", e.getMessage());
} finally {
try {
new Thread().sleep(2000);
} catch (InterruptedException e) {
log.error("没有客户端:{},webSocketMap:{}", e.getMessage(), webSocketMap.size());
}
}
}
}
}
以上是基于小编在开发过程中,针对websocket技术的实战经验,通过调用第三方wss的的数据,来获取实时数据