Files
xueguang_flutter_app/lib/request/websocket/room_websocket.dart
2025-11-28 13:31:23 +08:00

140 lines
3.5 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:app/config/config.dart';
import 'package:app/request/api/room_api.dart';
import 'package:app/request/websocket/room_protocol.dart';
import 'package:logger/logger.dart';
import '../dto/room/rtc_token_dto.dart';
Logger logger = Logger();
class RoomWebSocket {
///单例设计模式
RoomWebSocket._();
static final RoomWebSocket _instance = RoomWebSocket._();
factory RoomWebSocket() => _instance;
/// WebSocket和心跳定时器
String url = "";
WebSocket? _socket;
Timer? _heartbeatTimer;
Timer? _reconnectTimer; //错误重连的定时器
///令牌
String wsToken = ""; //自习室的websocket令牌
int roomId = 0; //房间号
RtcTokenDto? rtcToken; // rtc的令牌
///用 StreamController 分化消息给订阅者
final StreamController<RoomMessage> _msgController = StreamController.broadcast();
Stream<RoomMessage> get stream => _msgController.stream;
///初始化令牌
/// -[id] 房间id
Future<void> initToken(int id) async {
roomId = id;
final rtcFuture = getRtcTokenApi(id);
final wsFuture = getWsTokenApi(id);
rtcToken = await rtcFuture;
wsToken = await wsFuture;
}
///开始连接
Future<void> connect() async {
try {
_socket = await WebSocket.connect(
"${Config.wsUrl()}?token=$wsToken&study_room_id=$roomId",
);
logger.i("连接成功");
_reconnectTimer?.cancel();
_reconnectTimer = null;
//监听消息
_socket!.listen(
(data) {
//监听事件
final jsonMap = jsonDecode(data);
final event = RoomEvent.fromStr(jsonMap['action']);
if (event == null) {
print("未识别的 action: ${jsonMap['action']},消息已忽略");
return; // 直接跳过
} else {
logger.i("""
接收到事件: ${event.value}
数据: ${jsonMap['data']}
""");
}
final msg = RoomMessage(event, jsonMap['data']);
_msgController.add(msg);
},
onDone: () {},
onError: (_) {
logger.e("连接异常断开");
},
);
//自动加入房间
send(RoomCommand.joinRoom);
//心跳
_heartbeatTimer?.cancel();
_heartbeatTimer = Timer.periodic(Duration(seconds: 15), (_) {
send(RoomCommand.ping);
});
} catch (e) {
logger.e("连接失败");
_reconnect();
}
}
///发送指令
void send(RoomCommand action, [Map<String, dynamic>? params]) {
final msg = {
"action": action.value,
if (params != null) ...params,
};
if (action != RoomCommand.ping) {
logger.i("发送指令:$msg");
}
_socket!.add(jsonEncode(msg));
}
///连接错误事件
void _reconnect() {
_reconnectTimer?.cancel();
_reconnectTimer = Timer.periodic(Duration(seconds: 3), (timer) {
logger.e("正在重连");
connect();
});
}
void dispose() {
//心跳取消
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
//socket取消
_socket?.close();
// 销毁事件流
// _msgController.close();
// 错误重连取消
_reconnectTimer?.cancel();
_reconnectTimer = null;
logger.i("websocket销毁成功");
}
}
///websocket服务器发过来的事件和数据
class RoomMessage {
final RoomEvent event; //事件名称
final dynamic data; //事件数据
RoomMessage(this.event, this.data);
}