前言
有关java和C的交互的基本概念和知识,本文不再详述。有需要的可以参考我的这篇文章。
JNI、DLL、SO等相关概念
开发背景
C项目端开发了一套股票市场资讯推送的功能,多个小组都会用到该功能,为了避免重复开发,中台小组要负担中台责任,为同事们准备一个高可用的订阅入口。
原料
C++项目(拥有订阅功能,后面称之为C端),我的Java SpringMVC中台系统项目(后文称之为Java端),以及C++同事提供的DLL和SO文件和api的jar包,一个充当客户的前端访问页面(称之为前端)
Java代码开发思路 (SocketIO)
需求比较复杂,分为两个具体需求。
第一个
需求端需要一个前端H5页面,能实时展示各种最新的推送消息,比较交易量,资金量,成交量,撤回量等等。
第一个需求比较简单,因为他不针对客户,不需要做定制化推送。因为在开发第二个需求的过程实际上就包括了第一个需求,所以就不写具体开发代码了。有需要的同学,看懂了第二个需求开发,也就知道第一个怎么写了。
第二个
需求端需要一个调用接口,方便不同客户根据需要订阅不同的具体产品。
比如茅台股票和比亚迪股票,某个具体具体标的的价格波动信息,或者直接查看军工行业、电子行业的整个行业需求,又或者整个市场的资金流向等情况。
这一部分的需求看起来简单,实际上要考虑的东西不少。
分析:
1. 保存客户唯一标识。
既然是面对客户的订阅做定制化推送,那么我们就要对客户的身份做存储,我这里是在项目中庸map存储里客户的唯一标识和订阅请求。
2.SocketIOServer
用一个静态的SocketIOServer对象来处理与C端项目的交互,拿到订阅的返回数据后,再用它把数据send出去
3.用SocketIOClient 的room来异步推送数据
客户订阅后需要做到不停地推送信息到具体的用户,为了保证订阅消息推送到正确用户,单独向每个客户推送数据的成本很大,因为用户的数量可能是十万,百万级的,为了避免这种情况,这里用了SocketIOClient的room概念用来减少推送次数。
(room概念,即每个订阅相同讯息,相同要求。比如多个订阅茅台股票,以及波动率在百分之6以上的客户们就会分配到同一个room中,那么就可以一次推送,在room里的所有用户都会接收到了。)
这样下来,比如所有订阅茅台股票的波动率的推送,我们最多只需要10个room就可以处理好了(0.01,0.02.一直到到0.10)
4.订阅与取消订阅
客户继续订阅时,他的多次订阅结果会加到一起,而取消某个订阅目标时,订阅结果则会去掉这个目标。为了完成这个目的,用到了多线程以及每个订阅的用户都会拥有唯一的一个SocketIOClient对象,也就是下面代码的client。
socketIOServer.addEventListener(eventName, JSONObject.class, (client, data, ackSender) -> {
});
代码实践开发
MyServletContextListener (关于这个类的解释可以参考我这篇文章MyServletContextListener)
在这个类里面加上下列代码
import com.alibaba.fastjson.JSONObject;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.x.tcp.SimpleQrySpi;
import com.x.tcp.SubThread;
import com.x.util.UserCQCCertificateApi;
import com.qcvalueaddproapi.CQCValueAddProApi;
import org.springframework.web.context.support.WebApplicationContextUtils;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import java.io.InputStream;
import java.util.*;
public class MyServletContextListener implements ServletContextListener{
public static SocketIOServer socketIOServer;
public static Map<String, SocketIOClient> uidThread = new HashMap<String, SocketIOClient>();
@Override
public void contextDestroyed(ServletContextEvent arg0) {
// TODO Auto-generated method stub
}
@Override
public void contextInitialized(ServletContextEvent arg0) {
//SOCKETIO
try { //把qcvalueaddproapi和qcvalueaddproapi_jini这两个dll文件放到java_library_path这个目录下后,执行这段代码就可以将dll文件导入到java程序的地址空间
String java_library_path = System.getProperty("java.library.path");
System.out.println("java.library.path"+java_library_path);
System.loadLibrary("qcvalueaddproapi");
System.loadLibrary("qcvalueaddproapi_jini");
}
catch (Exception ex)
{
System.out.printf("loadLibrary %s \n",ex.toString());
}
System.out.println(String.format("GetApiVersion(): %s",CQCValueAddProApi.GetApiVersion()));
//创建Api和Spi对象 Api对象是jar包里的,SimpleQrySpi 对象是我自己写的,后面会给出
CQCValueAddProApi theapi = CQCValueAddProApi.CreateInfoQryApi();
SimpleQrySpi thespi =new SimpleQrySpi(theapi);
//java项目放给客户端请求的访问host和port
String host = null;
int port = 0;
try {
//获取当前类加载器
ClassLoader classLoader= MyServletContextListener.class.getClassLoader();
//通过当前累加载器方法获得 文件config.properties的一个输入流
InputStream is=classLoader.getResourceAsStream("config.properties");
//创建一个Properties 对象
Properties properties=new Properties();
//加载输入流
properties.load(is);
//C端提供给Java端的ip,port,以及id和pwd,用来登录连接到C端
thespi.m_ipaddress=properties.getProperty("cqc2.m_ipaddress");
thespi.m_port= Integer.parseInt(properties.getProperty("cqc2.m_port"));
thespi.m_investorid=properties.getProperty("cqc2.m_investorid");
thespi.m_passwd=properties.getProperty("cqc2.m_passwd");
//java端预备提供给前端的host和port(目前是localhost和10240)
host = properties.getProperty("socket.host");
port = Integer.parseInt(properties.getProperty("socketio.port"));
}catch (Exception e){
e.printStackTrace();
}
theapi.RegisterSpi(thespi);
theapi.RegisterFront(SimpleQrySpi.m_ipaddress,SimpleQrySpi.m_port);
theapi.Run(false); //带上参数登录到C端,到这里就完成了java端与C端的连接
///
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
config.setHostname(host);
config.setPort(port);
//静态的server创建好了
socketIOServer = new SocketIOServer(config);
//这个函数会监听所有带有RequestReceive的前端请求并执行逻辑,对应前端代码后面会贴出
socketIOServer.addEventListener("RequestReceive", JSONObject.class, (client, data, ackSender) -> {
//证明了addEventListener是多线程的
System.out.println("Current Thread Name: " + Thread.currentThread().getName());
//用uid来做用户的唯一标识
String uid = getParamsByClient(client);
if (uid != null )
{ //给每个uid配一个唯一的SocketIOClient对象并存起来,用client来joinroom,leaveroom
if(!uidThread.containsKey(uid)){
uidThread.put(uid, client);
}
System.out.println("Before uid:"+ uid + " client:"+client);
client = uidThread.get(uid);
System.out.println("After uid:"+ uid + " client:"+client);
System.out.println("After uid:"+ uid + " client:getNamespace: "+client.getNamespace().getName());
//这个对象是我写的,里面client会joinroom,leaveroom,然后发送订阅请求到C端
SubThread thesubthread = new SubThread(thespi,theapi,data,client);
thesubthread.run();//启动了
// thesubthread.start();
}
});
socketIOServer.start(); //启动了
System.out.println("输出: socket.io start:");
}
/**
* 此方法为获取client连接中的参数,可根据需求更改
* @param client
* @return
*/
private String getParamsByClient(SocketIOClient client) {
// 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)
Map<String, List<String>> params = client.getHandshakeData().getUrlParams();
List<String> list = params.get("UID");
if (list != null && list.size() > 0) {
return list.get(0);
}
return null;
}
}
注释加很多了,不明白可以给我留言
SimpleQrySpi
//异步返回推送数据
public class SimpleQrySpi extends CQCValueAddProSpi {
public static String m_ipaddress = new String();
public static int m_port = 0;
public static String m_investorid = new String();
public static String m_passwd = new String();
public static int m_Last_QryStockDayQuotation_PageLocate = 0;
public CQCValueAddProApi m_api = null;
public AtomicInteger m_reqid = new AtomicInteger(100000);
public AtomicBoolean m_is_connected = new AtomicBoolean(false);
public AtomicBoolean m_is_logon = new AtomicBoolean(false);
//预留给前端页面匹配的推送eventName
public static final String PUSH_SUBSCRIBE = "push_subscribe";
//订阅价格波动
public static final String EffectOrderDetail= "Eorder";
public static final String UnEffectOrderDetail= "UnEorder";
@Override
public void OnRspUserLogin( x rsp, x pRspInfo, int nRequestID, boolean bIsLast) {
if (null != rsp) { xxxxxx }
if (0 == pRspInfo.getErrorID()) {
m_is_connected.set(true);
m_is_logon.set(true);
}
}
@Override
public void OnRspEOrder(x rsp, x pRspInfo, int nRequestID, boolean bIsLast) {
System.out.println(String.format("OnRspEOrder IsLast=%B RequestID=%d ErrorID=%d ErrorMsg=%s ",
bIsLast, nRequestID, pRspInfo.getErrorID(), pRspInfo.getErrorMsg()));
}
@Override
//这个就是C端推送时会回调的异步方法
//rtn这个就是C端送回来的数据
public void OnRtnEOrder(x rtn) {
System.out.println("OnRtnEOrder :" + rtn.getSecurityID());
//radioMap的定义在SubThread里
Set<Double> radioSet = radioMap.get(EffectOrderDetail+ rtn.getSecurityID());
//radioSet里有几个radio,那就要分作几个room发送
for (Double radio : radioSet) {
System.out.println("radioSet遍历中:"+radio);
//如果radioset里的值比返回里的值小或等于,那么就推送。大就不推送
if(radio.compareTo(rtn.getEffectRatio()) <= 0){
String s2 =EffectOrderDetail+ rtn.getSecurityID()+ radio;
System.out.println("s2:"+ s2);
System.out.println("AllClients:"+socketIOServer.getAllClients());
socketIOServer.getRoomOperations(EffectOrderDetail+ rtn.getSecurityID()+ radio).sendEvent(PUSH_SUBSCRIBE, rtn);
System.out.println("这个radio的room:"+radio+"推送了!");
}
}
System.out.println("PUSH_SUBSCRIBE:EffectOrderDetail success");
}
//退订价格波动异常委托明细请求
@Override
public void OnRspUnEorder(x x, x rsp, int nRequestID, boolean bIsLast) {
//这里没什么逻辑,确定取消订阅成功就行
}
}
SubThread
import com.alibaba.fastjson.JSONObject;
import com.corundumstudio.socketio.SocketIOClient;
import com.qcvalueaddproapi.CQCVDReqOptionGreeceField;
import com.qcvalueaddproapi.CQCValueAddProApi;
import com.qcvalueaddproapi.qcvalueaddproapiConstants;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static com.x.tcp.SimpleQrySpi.*;
import static com.x.timeTask.MyServletContextListener.uidThread;
/**
* @author Wu, x
* @version 1.0
* @date 2023/1/9 11:19
*/
public class SubThread {
private SimpleQrySpi m_spi;
private CQCValueAddProApi m_api;
private JSONObject m_data;
private SocketIOClient m_client;
public AtomicInteger m_reqid = new AtomicInteger(100000);
//key是methodName+securityID,value是对应请求里所有的radio值 存下所有请求里最小的radio
public static Map<String, Set<Double>> radioMap = new HashMap<String, Set<Double>>();
public static Map<SocketIOClient,Double> clientRadioMap = new HashMap<>();
public SubThread(SimpleQrySpi t_spi,CQCValueAddProApi t_api) {
m_spi=t_spi;
m_api=t_api;
}
public SubThread(SimpleQrySpi t_spi, CQCValueAddProApi t_api, JSONObject data, SocketIOClient client) {
m_spi=t_spi;
m_api=t_api;
m_data=data;
m_client=client;
}
public void run() {
boolean flag = true;
while(flag)
{
if(m_spi.m_is_connected.get() && m_spi.m_is_logon.get())
{
String methodName = m_data.getString("interfaceName");
//订阅价格波动明细
if(EffectOrderDetail.equals(methodName)){
String securityID = m_data.getString("securityID");
System.out.println("run: "+methodName+ " securityID:" + securityID);
Double radio = m_data.getDouble("radio");
//将radio加入到room里
String s1 = methodName+securityID + radio;
System.out.println("s1:"+ s1);
clientRadioMap.put(m_client,radio);
m_client.joinRoom(methodName+securityID + radio);
//看看当前请求入参的radio的值是否存在radioSet里并处理。
if(!radioMap.containsKey(methodName+securityID)){
Set<Double> radioSet = new HashSet<>();
radioSet.add(radio);
radioMap.put(methodName+securityID,radioSet);
}else {
Set<Double> radioSet = radioMap.get(methodName+securityID);
//看看radioSet里没有当前请求的radio值,没有就存进来
if (!radioSet.contains(radio) ){
System.out.println("Map里存放的radioSet里没有当前请求的radio值,所以我们需要存下更新这个radio的value");
radioSet.add(radio);
radioMap.put(methodName+securityID,radioSet);
}
}
//重新获取一遍set值
Set<Double> radioSet = radioMap.get(methodName+securityID);
//set里最小的 radio值,拿来使用
Double radioMin = Collections.min(radioSet);
int t_ret1 = m_api.SubscribeEffectOrderDetail(m_data.getString("exchageID").charAt(0), securityID, radioMin);
}
//取消订阅
if(UnEffectOrderDetail.equals(methodName)){
String securityID = m_data.getString("securityID");
System.out.println("run: UnEffectOrderDetail: "+methodName+ " securityID:" + securityID);
//map转为流然后再收集为map,其中对流的操作就是将原本的k,v结构颠倒,这时候我们就拿到了以原本value作为key的新map集合,然后在通过map自带的get方法就可以获取到对应的值
String uid = uidThread.entrySet().stream().collect(Collectors.toMap(entity-> entity.getValue(), entity-> entity.getKey())).get(m_client);
double radio = clientRadioMap.get(m_client);
System.out.println("s3:"+methodName+securityID+radio);
m_client.leaveRoom(methodName+securityID+radio);
uidThread.remove(uid);
int t_ret1 = m_api.UnSubscribeEffectOrderDetail(m_data.getString("exchageID").charAt(0), securityID);
}
}
前端H5请求页面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>客户端</title>
</head>
<body>
M1
</body>
<script src="socket.io.js"></script>
<script type="text/javascript">
const serverUri = "http://localhost:10240";
//prd
// const serverUri = "http://xx.xxx.xx.xx:10240";
const sendEvent = "push_subscribe";
const ServerReceive = "RequestReceive";
var socket;
connect(9005);
function connect(uid) {
socket = io.connect(serverUri, {
transports: ['websocket'],
'force new connection': true,
'query': 'UID=' + uid
});
socket.on('connect', function () {
console.log("连接成功");
//如果发送字符串send("hello Server"); *需要修改服务端接收类型
send({
message: "hello Server",
securityID:"002607",
interfaceName:"EffectOrderDetail",
exchageID:"2",
radio:0.05
});
});
socket.on('disconnect', function () {
console.log("连接断开");
});
socket.on('push_subscribe', (data) => { // 全局接收
// 处理data数据
// this.tableMove(data.message)
console.log(data)
// 向后端发送数据
// this.socketMy.emit('事件名', '数据')
})
}
function send(data) {
socket.emit(ServerReceive, data);
}
</script>
</html>
项目结构
标红的jquery-3.2.1.min.js
socket.io.js 可以到网上去下载
总结
如果要将项目部署到linux环境,记得把so文件放到linux环境里。