TermRemoteCtl/apps/mobile_app/lib/features/terminal/terminal_socket_session.dart

191 lines
4.8 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import '../../core/network/agent_api_client.dart';
import '../../core/network/agent_socket_client.dart';
import '../sessions/session.dart';
typedef TerminalSocketTransportFactory =
TerminalSocketTransport Function(Uri uri);
final terminalSocketSessionFactoryProvider =
Provider<TerminalSocketSessionFactory>((ref) {
return TerminalSocketSessionFactory(
transportFactory: WebSocketTerminalSocketTransport.connect,
);
});
class TerminalSocketSessionFactory {
TerminalSocketSessionFactory({
TerminalSocketTransportFactory? transportFactory,
}) : _transportFactory =
transportFactory ?? WebSocketTerminalSocketTransport.connect;
final TerminalSocketTransportFactory _transportFactory;
TerminalSocketSession create({
required Uri baseUri,
required Session session,
}) {
return TerminalSocketSession(
sessionId: session.sessionId,
socketClient: AgentSocketClient(baseUri),
transportFactory: _transportFactory,
);
}
}
class TerminalSocketSession {
TerminalSocketSession({
required this.sessionId,
required this.socketClient,
TerminalSocketTransportFactory? transportFactory,
}) : _transportFactory =
transportFactory ?? WebSocketTerminalSocketTransport.connect;
final String sessionId;
final AgentSocketClient socketClient;
final TerminalSocketTransportFactory _transportFactory;
TerminalSocketTransport? _transport;
StreamSubscription<dynamic>? _subscription;
bool _isAttached = false;
Future<void> connect({
required void Function(String frame) onFrame,
void Function()? onDisconnected,
}) async {
if (_transport != null || _subscription != null) {
await dispose();
}
final transport = _transportFactory(
socketClient.buildTerminalSocketUri(sessionId),
);
_transport = transport;
final attachedCompleter = Completer<void>();
_subscription = transport.stream.listen(
(message) {
if (message is! String) {
return;
}
if (!_isAttached && _handleAttachedAck(message)) {
_isAttached = true;
if (!attachedCompleter.isCompleted) {
attachedCompleter.complete();
}
return;
}
onFrame(message);
},
onError: (error, stackTrace) {
if (!attachedCompleter.isCompleted) {
attachedCompleter.completeError(error, stackTrace);
}
},
onDone: () {
if (!attachedCompleter.isCompleted) {
attachedCompleter.completeError(
StateError('Terminal socket closed before attach acknowledgement.'),
);
return;
}
onDisconnected?.call();
},
);
transport.send(jsonEncode(socketClient.buildAttachMessage(sessionId)));
try {
await attachedCompleter.future;
} catch (_) {
await dispose();
rethrow;
}
}
TerminalSocketDispatchResult sendInput(String input) {
final transport = _transport;
if (input.isEmpty) {
return TerminalSocketDispatchResult.emptyInput;
}
if (transport == null) {
return TerminalSocketDispatchResult.noTransport;
}
transport.send(jsonEncode(socketClient.buildInputMessage(input)));
return TerminalSocketDispatchResult.sent;
}
void sendResize(int columns, int rows) {
final transport = _transport;
if (transport == null || columns <= 0 || rows <= 0) {
return;
}
transport.send(jsonEncode(socketClient.buildResizeMessage(columns, rows)));
}
Future<void> dispose() async {
final subscription = _subscription;
final transport = _transport;
_subscription = null;
_transport = null;
_isAttached = false;
await subscription?.cancel();
try {
await transport?.close();
} catch (_) {}
}
bool _handleAttachedAck(String frame) {
try {
final decoded = jsonDecode(frame);
if (decoded is Map && decoded['type'] == 'attached') {
return true;
}
} catch (_) {}
return false;
}
}
enum TerminalSocketDispatchResult { sent, noTransport, emptyInput }
abstract class TerminalSocketTransport {
Stream<dynamic> get stream;
void send(String message);
Future<void> close();
}
class WebSocketTerminalSocketTransport implements TerminalSocketTransport {
WebSocketTerminalSocketTransport(this._channel);
final WebSocketChannel _channel;
static WebSocketTerminalSocketTransport connect(Uri uri) {
return WebSocketTerminalSocketTransport(WebSocketChannel.connect(uri));
}
@override
Stream<dynamic> get stream => _channel.stream;
@override
void send(String message) {
_channel.sink.add(message);
}
@override
Future<void> close() {
return _channel.sink.close();
}
}