flutter开发实战-长链接WebSocket使用stomp协议stomp_dart_client
在app中经常会使用长连接进行消息通信,这里记录一下基于websocket使用stomp协议的使用。
一、stomp:流文本定向消息协议
1.1 stomp介绍
stomp,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。
它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,类似于OpenWire(一种二进制协议)。
1.2 协议支持
stomp 1.0
stomp 1.1 (including heart-beating)
1.3 stomp frame(帧)
stomp frame(帧)对象包括command、headers、body
command和headers属性始终会被定义,若头部信息时,headers参数可为{},body也可能为空
二、flutter上使用stomp
2.1 引入库stomp_dart_client
flutter上使用stomp时,需要在pubspec.yaml引入库如下
# stomp协议长链接
stomp_dart_client: ^0.4.4
stomp: ^0.8.0
2.2 实现websocketmanager封装stomp
// 管理长链接socket, stomp协议
import 'package:stomp_dart_client/stomp.dart';
import 'package:stomp_dart_client/stomp_config.dart';
import 'package:stomp_dart_client/stomp_frame.dart';
// 接收到stomp协议的frame的callback
typedef OnFrameCallback = void Function(StompFrame);
enum StompState {
IDLE,
CREATED,
CONNECTING,
CONNECTED,
RECONNECTING,
DISCONNECTED,
ERROR,
}
class WebSocketStompManager {
//私有构造函数
WebSocketStompManager._internal();
//保存单例
static WebSocketStompManager _singleton = WebSocketStompManager._internal();
//工厂构造函数
factory WebSocketStompManager() => _singleton;
// 订阅的Subscription
// 保存订阅, id: dynamic
Map _subscriptions = Map<String, dynamic>();
// stomp的headers信息
Map<String, String>? _headers = Map<String, String>();
// 是否连接
StompState _stompState = StompState.IDLE;
// 当前连接的Url
String _urlString = '';
// StompClient client
StompClient? _client;
// 创建连接
void createConnect(String urlString, Map<String, String> headers) {
_urlString = urlString;
_headers = _headers;
_client?.deactivate();
_client = null;
_client = StompClient(
config: StompConfig(
url: urlString,
// connectionTimeout: Duration(seconds: 10),
// stompConnectHeaders: {
// 'upgraded': 'websocket',
// },
// webSocketConnectHeaders: {
// 'upgraded': 'websocket',
// },
// 连接
beforeConnect: beforeConnectCallback,
onConnect: onConnectCallback,
onDisconnect: onDisconnectCallback,
onStompError: onStompErrorCallback,
onUnhandledFrame: onUnhandledFrameCallback,
onUnhandledMessage: onUnhandledMessageCallback,
onUnhandledReceipt: onUnhandledReceiptCallback,
onWebSocketError: onWebSocketErrorCallback,
onWebSocketDone: onWebSocketDoneCallback,
onDebugMessage: onDebugMessageCallback,
));
}
/// beforeConnect:未来 在建立连接之前将等待的异步函数。
Future<void> beforeConnectCallback() async {
// 在建立连接之前将等待的异步函数。
print("beforeConnectCallback 在建立连接之前将等待的异步函数。");
print('waiting to connect...');
// await Future.delayed(Duration(milliseconds: 200));
print('connecting...');
}
/// onClientNotCreateCallback, client未创建
void onClientNotCreateCallback() {
// client未创建
print("onClientNotCreateCallback client未创建");
}
/// onConnect:函数(StompFrame) 客户端连接成功调用的函数
void onConnectCallback(StompFrame connectFrame) {
// client is connected and ready
// 如果连接成功
print(
"onConnectCallback 客户端连接成功调用的函数:"
"${connectFrame.toString()},"
"${connectFrame.command},"
"${connectFrame.headers},"
"${connectFrame.body}"
);
}
/// onDisconnect:函数(StompFrame) 客户端预期断开连接时调用的函数
void onDisconnectCallback(StompFrame p1) {
// 客户端预期断开连接时调用的函数
print("onDisconnectCallback 客户端预期断开连接时调用的函数:${p1.toString()}");
}
/// onStompError:函数(StompFrame) 当 stomp 服务器发送错误帧时要调用的函数
void onStompErrorCallback(StompFrame p1) {
// 当 stomp 服务器发送错误帧时要调用的函数
print("onStompErrorCallback 当 stomp 服务器发送错误帧时要调用的函数:${p1.toString()}");
}
/// onUnhandledFrame:函数(StompFrame) 服务器发送无法识别的帧时调用的函数
void onUnhandledFrameCallback(StompFrame p1) {
// 服务器发送无法识别的帧时调用的函数
print("onUnhandledFrameCallback 服务器发送无法识别的帧时调用的函数:${p1.toString()}");
}
/// onUnhandledMessage:函数(StompFrame) 当订阅消息没有处理程序时要调用的函数
void onUnhandledMessageCallback(StompFrame p1) {
// 当订阅消息没有处理程序时要调用的函数
print("onUnhandledMessageCallback 当订阅消息没有处理程序时要调用的函数:${p1.toString()}");
}
/// onUnhandledReceipt:函数(StompFrame) 当接收消息没有注册观察者时调用的函数
void onUnhandledReceiptCallback(StompFrame p1) {
// 当接收消息没有注册观察者时调用的函数
print("onUnhandledReceiptCallback 当接收消息没有注册观察者时调用的函数:${p1.toString()}");
}
/// onWebSocketError:函数(动态) 当底层 WebSocket 抛出错误时要调用的函数
void onWebSocketErrorCallback(dynamic error) {
// 当底层 WebSocket 抛出错误时要调用的函数
print(
"onWebSocketErrorCallback 当底层 WebSocket 抛出错误时要调用的函数:${error.toString()}");
}
/// onWebSocketDone:函数() 当底层 WebSocket 完成/断开连接时要调用的函数
void onWebSocketDoneCallback() {
// 当底层 WebSocket 完成/断开连接时要调用的函数
print("onWebSocketDoneCallback 当底层 WebSocket 完成/断开连接时要调用的函数");
}
/// onDebugMessage:函数(字符串) 为内部消息处理程序生成的调试消息调用的函数
void onDebugMessageCallback(String p1) {
// 为内部消息处理程序生成的调试消息调用的函数
print("onDebugMessageCallback 为内部消息处理程序生成的调试消息调用的函数:${p1}");
}
// 连接
void connect() {
// connect连接
if (_client != null) {
_client?.activate();
} else {
// 未创建client
onClientNotCreateCallback();
}
}
// Subscribe
void subscribe(String destination, OnFrameCallback? onFrameCallback) {
if (_client != null) {
dynamic unsubscribeFn = _client?.subscribe(
destination: destination,
headers: _headers,
callback: (frame) {
// Received a frame for this subscription
print(frame.body);
if (onFrameCallback != null) {
onFrameCallback(frame);
}
});
_subscriptions.putIfAbsent(destination, () => unsubscribeFn);
} else {
// 未创建client
onClientNotCreateCallback();
}
}
// client.subscribe(...) returns a function which can be called with an optional map of headers
void unsubscribe(String destination) {
if (_client != null) {
dynamic unsubscribeFn = _subscriptions[destination];
unsubscribeFn(unsubscribeHeaders: {});
} else {
// 未创建client
onClientNotCreateCallback();
}
}
// client.subscribe(...) returns a function which can be called with an optional map of headers
void unsubscribeAll() {
// 退订所有
// 调用 Map 对象的 keys 成员 , 返回一个由 键 Key 组成的数组
for (var destination in _subscriptions.keys){
unsubscribe(destination);
}
}
void send(String destination, String? message) {
if (_client != null) {
_client?.send(destination: destination, body: message, headers: _headers);
} else {
// 未创建client
onClientNotCreateCallback();
}
}
void disconnect() {
if (_client != null) {
_client?.deactivate();
} else {
// 未创建client
onClientNotCreateCallback();
}
}
}
2.3 使用websocketmanager收发消息
创建页面进行消息收发
class MyHomePage extends StatefulWidget {
MyHomePage({Key? key, required this.title}) : super(key: key);
final String title;
_MyHomePageState createState() => _MyHomePageState();
}
class _MyHomePageState extends State<MyHomePage> {
void initState() {
// TODO: implement initState
super.initState();
}
void dispose() {
// TODO: implement dispose
super.dispose();
}
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
// Here we take the value from the MyHomePage object that was created by
// the App.build method, and use it to set our appbar title.
title: Text(widget.title),
),
floatingActionButton: FloatingActionButton(
onPressed: () {
_incrementCounter(model);
},
tooltip: 'Increment',
child: Icon(Icons.add),
),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
Wrap(
spacing: 8.0, // 主轴(水平)方向间距
runSpacing: 4.0, // 纵轴(垂直)方向间距
alignment: WrapAlignment.center, //沿主轴方向居中
children: [
TextButton(
onPressed: stompCreate,
child: Container(
color: Colors.black26,
child: Text(
'stomp创建',
style: Theme.of(context).textTheme.bodyMedium,
),
),
),
TextButton(
onPressed: stompConnect,
child: Container(
color: Colors.black26,
child: Text(
'stomp连接',
style: Theme.of(context).textTheme.bodyMedium,
),
),
),
TextButton(
onPressed: stompSubscribe,
child: Container(
color: Colors.black26,
child: Text(
'stomp订阅',
style: Theme.of(context).textTheme.bodyMedium,
),
),
),
TextButton(
onPressed: stompUnSubscribe,
child: Container(
color: Colors.black26,
child: Text(
'stomp退订',
style: Theme.of(context).textTheme.bodyMedium,
),
),
),
TextButton(
onPressed: stompSendMessage,
child: Container(
color: Colors.black26,
child: Text(
'stomp发送消息',
style: Theme.of(context).textTheme.bodyMedium,
),
),
)
],
),
],
),
),
);
}
// 测试stomp长链接
void stompCreate() {
// 创建stompClint
WebSocketStompManager().createConnect("ws://192.168.100.25:8080/test-endpoint/websocket", {});
}
void stompConnect() {
WebSocketStompManager().connect();
}
void stompSubscribe() {
WebSocketStompManager()
.subscribe("/topic/echo", (p0) {
print("stompSubscribe 1:$p0");
});
WebSocketStompManager()
.subscribe("/topic/echo", (p0) {
print("stompSubscribe 2:$p0");
});
}
void stompUnSubscribe() {
WebSocketStompManager().unsubscribeAll();
}
void stompSendMessage() {
WebSocketStompManager().send("/app/echo", "haha message from dart");
}
}
至此实现了flutter开发实战-长链接WebSocket 使用stomp协议,进行消息发送、消息接收。
2.4 注意事项
由于stomp_dart_client不支持https,如果使用WebSocketStompManager().createConnect(“ws://192.168.100.25:8080/test-endpoint/websocket”, {});
会报告错误“Not support Https shceme”,所以这里要使用ws或者wss。
三、小结
至此实现了flutter开发实战-长链接WebSocket 使用stomp协议,进行消息发送、消息接收。stomp实现的库stomp_dart_client来实现该功能。
学习记录,每天不停进步。