摘要:
本示例演示了一个基本的服务端5分钟定时向客户端app推送消息的WebSocket机制。服务端使用WebSocket协议接受客户端的订阅和取消订阅请求,并根据客户端的订阅状态发送实时消息。服务端记录并打印带有时间戳的日志,以监控订阅活动。客户端可以接收来自服务端的消息,并根据需要处理这些消息。
实现流程
1.初始化WebSocket服务端
创建并启动WebSocket服务端,准备接收客户端的连接请求。
/**
* 心跳通常是指客户端或服务器定期发送一个小型的、空的消息以保持连接的活动状态。它用于检测连接是否仍然有效,并防止连接由于长时间没有活动而被关闭。
*
* 推送是指服务器主动向客户端发送实际的数据或消息。服务器可以根据特定的业务逻辑或事件触发,将数据推送给客户端,而不需要客户端发起请求。
*/
package com.fadi.power.netpowerpushservice
import io.ktor.application.*
import io.ktor.http.cio.websocket.*
import io.ktor.routing.*
import io.ktor.server.engine.embeddedServer
import io.ktor.server.netty.Netty
import io.ktor.websocket.WebSockets
import io.ktor.websocket.webSocket
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicInteger
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import java.util.concurrent.ConcurrentHashMap
import java.text.SimpleDateFormat
import java.util.Calendar
fun main() {
embeddedServer(Netty, port = 8080, module = Application::module).start(wait = true)
}
fun Application.module() {
install(WebSockets)
routing {
val connections = AtomicInteger(0)
// 使用 ConcurrentHashMap 以支持线程安全的读写操作
val clients = ConcurrentHashMap<WebSocketSession, Boolean>()
webSocket("/ws") {
connections.incrementAndGet()
println("Client connected. Total connections: ${connections.get()}")
// 默认情况下,新客户端愿意接收 push 消息
clients[this] = true
try {
// 启动一个协程用于定时向客户端发送消息
val pushJob = launch {
while (isActive) {
delay(300000) // 等待5分钟
clients.forEach { (client, wantsPush) ->
if (client.isActive && wantsPush) {
val currentDateTime = Calendar.getInstance().time
val formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)
client.send("Server push message at ${formattedDateTime}")
}
}
}
}
for (frame in incoming) {
2.客户端连接和设置是否推送处理
接受客户端连接,并为每个客户端创建一个会话。
companion object {
const val HEARTBEAT_MESSAGE = "Heartbeat"
const val SUBSCRIBE_MESSAGE = "subscribe"
const val UNSUBSCRIBE_MESSAGE = "unsubscribe"
}
override fun onOpen(handshakedata: ServerHandshake?) {
// 连接成功,发送数据或执行其他操作
noteWebSocketClientOpen()
send("Hello, Server!")
Log.d(Config.TAG, "WebSocketClient onOpen: Hello, Server!")
// 【不使用自带的心跳,使用自建Alarm定时】启动定时任务发送心跳消息
// startHeartbeat()
}
// 设置是否推送
fun setEnablePush(enable: Boolean) {
if (enable) {
send(SUBSCRIBE_MESSAGE)
} else {
send(UNSUBSCRIBE_MESSAGE)
}
}
3.消息接收与处理
接收来自客户端的消息,并根据消息内容做出响应。
@Override
public void onMessage(WebSocket conn, String message) {
System.out.println("Received message from client: " + message);
// 处理客户端发送的消息
}
4.订阅与取消订阅逻辑
根据客户端发送的指令更新订阅状态,并记录带时间戳的日志。
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val receivedText = frame.readText()
println("Received message: $receivedText")
when (receivedText) {
"unsubscribe" -> {
val currentDateTime = Calendar.getInstance().time
val formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)
println("[$formattedDateTime] unsubscribe: 关闭推送")
}
"subscribe" -> {
clients[this] = true
val currentDateTime = Calendar.getInstance().time
val formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)
println("[$formattedDateTime] subscribe: 开启推送")
}
else -> send("Server received: $receivedText")
}
}
else -> {}
}
}
5.推送消息
向已订阅的客户端发送推送消息。
// 启动一个协程用于定时向客户端发送消息
val pushJob = launch {
while (isActive) {
delay(300000) // 等待5分钟
clients.forEach { (client, wantsPush) ->
if (client.isActive && wantsPush) {
val currentDateTime = Calendar.getInstance().time
val formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)
client.send("Server push message at ${formattedDateTime}")
}
}
}
}
6.客户端接受消息
override fun onMessage(message: String?) {
// 接收到服务器发送的消息,执行相应的处理逻辑
message?.let {
Log.d(Config.TAG, "WebSocketClient onMessage: Received message: $it")
}
}
7. 日志验证
7.1 服务端
服务端支持动态开启和关闭推送
7.2 app端
App收到定时5分钟的服务器push消息