From 3d06ee0a199f7fc5a83d994086c61827395a2c0d Mon Sep 17 00:00:00 2001 From: sladro Date: Sat, 11 Apr 2026 07:57:56 +0800 Subject: [PATCH] fix terminal recovery and isolate output pumps --- .gitignore | 1 + .../lib/core/network/agent_socket_client.dart | 10 +- .../lib/features/terminal/terminal_page.dart | 22 +--- .../terminal/terminal_restore_decision.dart | 27 ---- .../terminal/terminal_restore_payload.dart | 14 ++ .../terminal_session_coordinator.dart | 103 +++++++++++---- .../terminal/terminal_socket_session.dart | 35 ++++- .../network/agent_socket_client_test.dart | 3 +- .../terminal_restore_decision_test.dart | 6 +- .../terminal_session_coordinator_test.dart | 69 +++++++++- .../terminal_socket_session_test.dart | 31 ++++- apps/mobile_app/test/widget_test.dart | 5 +- .../History/SessionHistoryStore.cs | 53 +++++--- .../History/SessionIoJournalStore.cs | 37 ++++-- .../Realtime/TerminalWebSocketHandler.cs | 120 +++++++++++++++++- .../Sessions/SessionRegistry.cs | 39 +++++- .../Sessions/SessionRestoreSnapshot.cs | 5 +- .../Sessions/TerminalInputReceiptTracker.cs | 45 +++++++ .../Terminal/PowerShellSessionHost.cs | 81 +++++++++--- .../Realtime/TerminalWebSocketHandlerTests.cs | 110 +++++++++++++++- .../Terminal/PowerShellSessionHostTests.cs | 64 +++++++++- 21 files changed, 742 insertions(+), 138 deletions(-) create mode 100644 apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/TerminalInputReceiptTracker.cs diff --git a/.gitignore b/.gitignore index 240aad2..49d5064 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,7 @@ Thumbs.db data/ certs/ .superpowers/ +.codex/ .worktrees/ work/*.log work/dotnet-*/ diff --git a/apps/mobile_app/lib/core/network/agent_socket_client.dart b/apps/mobile_app/lib/core/network/agent_socket_client.dart index e2be05e..b62fc90 100644 --- a/apps/mobile_app/lib/core/network/agent_socket_client.dart +++ b/apps/mobile_app/lib/core/network/agent_socket_client.dart @@ -16,10 +16,12 @@ class AgentSocketClient { Map buildAttachMessage(String sessionId) => {'type': 'attach', 'sessionId': sessionId}; - Map buildInputMessage(String input) => { - 'type': 'input', - 'input': input, - }; + Map buildInputMessage(String input, {String? inputId}) => + { + 'type': 'input', + 'input': input, + if (inputId != null && inputId.isNotEmpty) 'inputId': inputId, + }; Map buildResizeMessage(int columns, int rows) => {'type': 'resize', 'columns': columns, 'rows': rows}; diff --git a/apps/mobile_app/lib/features/terminal/terminal_page.dart b/apps/mobile_app/lib/features/terminal/terminal_page.dart index 402ccc0..38cc5e8 100644 --- a/apps/mobile_app/lib/features/terminal/terminal_page.dart +++ b/apps/mobile_app/lib/features/terminal/terminal_page.dart @@ -21,7 +21,6 @@ import 'history_window.dart'; import 'repeatable_terminal_key_button.dart'; import 'terminal_interaction_controller.dart'; import 'terminal_restore_payload.dart'; -import 'terminal_restore_decision.dart'; import 'terminal_session_coordinator.dart'; import 'terminal_snapshot.dart'; import 'terminal_snapshot_storage.dart'; @@ -460,19 +459,10 @@ class _TerminalPageState extends ConsumerState _receivedRestorePayload = true; _awaitingReconnectRestore = false; _cancelHistorySeedTimer(); - final combined = restore.screenText + restore.pendingInput; - if (combined.isEmpty) { - _scheduleSnapshotPersist(); - return; - } - - final decision = decideTerminalRestore( - currentText: terminal.buffer.getText(), - restoreText: combined, - ); - if (decision == TerminalRestoreDecision.replaceWithRestore) { - _resetTerminalForReplay(); - terminal.write(combined); + final restoreFrame = restore.buildReplayFrame(); + _resetTerminalForReplay(); + if (restoreFrame.isNotEmpty) { + terminal.write(restoreFrame); } _historySeeded = _terminalHasVisibleContent; _scheduleSnapshotPersist(); @@ -568,10 +558,6 @@ class _TerminalPageState extends ConsumerState terminal.buffer.getText().trim().isNotEmpty; void _resetTerminalForReplay() { - if (!_terminalHasVisibleContent) { - return; - } - terminal.buffer.clear(); terminal.buffer.setCursor(0, 0); terminal.notifyListeners(); diff --git a/apps/mobile_app/lib/features/terminal/terminal_restore_decision.dart b/apps/mobile_app/lib/features/terminal/terminal_restore_decision.dart index 10d65af..15c902f 100644 --- a/apps/mobile_app/lib/features/terminal/terminal_restore_decision.dart +++ b/apps/mobile_app/lib/features/terminal/terminal_restore_decision.dart @@ -4,32 +4,5 @@ TerminalRestoreDecision decideTerminalRestore({ required String currentText, required String restoreText, }) { - if (restoreText.isEmpty) { - return TerminalRestoreDecision.keepLocal; - } - - if (currentText.isEmpty) { - return TerminalRestoreDecision.replaceWithRestore; - } - - final normalizedCurrent = _normalizeTerminalText(currentText); - final normalizedRestore = _normalizeTerminalText(restoreText); - - if (normalizedCurrent.isEmpty) { - return TerminalRestoreDecision.replaceWithRestore; - } - - if (normalizedCurrent == normalizedRestore) { - return TerminalRestoreDecision.keepLocal; - } - - if (normalizedCurrent.startsWith(normalizedRestore)) { - return TerminalRestoreDecision.keepLocal; - } - return TerminalRestoreDecision.replaceWithRestore; } - -String _normalizeTerminalText(String text) { - return text.replaceAll('\r\n', '\n').replaceAll('\r', '\n'); -} diff --git a/apps/mobile_app/lib/features/terminal/terminal_restore_payload.dart b/apps/mobile_app/lib/features/terminal/terminal_restore_payload.dart index 93f55b9..ca0310e 100644 --- a/apps/mobile_app/lib/features/terminal/terminal_restore_payload.dart +++ b/apps/mobile_app/lib/features/terminal/terminal_restore_payload.dart @@ -1,15 +1,24 @@ +import 'terminal_screen_snapshot.dart'; + class TerminalRestorePayload { const TerminalRestorePayload({ required this.sessionId, required this.sequence, required this.screenText, required this.pendingInput, + this.screenSnapshot, }); final String sessionId; final int sequence; final String screenText; final String pendingInput; + final TerminalScreenSnapshot? screenSnapshot; + + String buildReplayFrame() { + final screenReplay = screenSnapshot?.toReplaySequence() ?? screenText; + return '$screenReplay$pendingInput'; + } factory TerminalRestorePayload.fromJson(Map json) { return TerminalRestorePayload( @@ -17,6 +26,11 @@ class TerminalRestorePayload { sequence: json['sequence'] as int, screenText: (json['screenText'] as String?) ?? '', pendingInput: (json['pendingInput'] as String?) ?? '', + screenSnapshot: json['screenSnapshot'] is Map + ? TerminalScreenSnapshot.fromJson( + Map.from(json['screenSnapshot'] as Map), + ) + : null, ); } } diff --git a/apps/mobile_app/lib/features/terminal/terminal_session_coordinator.dart b/apps/mobile_app/lib/features/terminal/terminal_session_coordinator.dart index b201674..eeaf828 100644 --- a/apps/mobile_app/lib/features/terminal/terminal_session_coordinator.dart +++ b/apps/mobile_app/lib/features/terminal/terminal_session_coordinator.dart @@ -80,7 +80,7 @@ class TerminalSessionCoordinator extends ChangeNotifier { bool _connectionAttemptInProgress = false; bool _reconnectPending = false; String _connectionStatus = 'Connecting...'; - final List _pendingInputs = []; + final List<_PendingInputDispatch> _pendingInputs = <_PendingInputDispatch>[]; int _pendingInputCharacterCount = 0; int _sessionGeneration = 0; int? _pendingResizeColumns; @@ -90,6 +90,7 @@ class TerminalSessionCoordinator extends ChangeNotifier { int? _lastReceivedSequence; int? _recoveryGapBaselineSequence; bool _isBackendResizeEnabled = true; + int _nextInputId = 0; bool get isLoadingOlderHistory => _isLoadingOlderHistory; @@ -161,6 +162,13 @@ class TerminalSessionCoordinator extends ChangeNotifier { _handleRestore(restore); }, + onInputAck: (inputId) { + if (!_isCurrentSession(socketSession, sessionGeneration)) { + return; + } + + _handleInputAck(inputId); + }, onDisconnected: () { if (!_isCurrentSession(socketSession, sessionGeneration)) { return; @@ -214,7 +222,7 @@ class TerminalSessionCoordinator extends ChangeNotifier { final socketSession = _socketSession; if (socketSession == null) { if (_shouldBufferInput(input)) { - _bufferInput(input); + _enqueuePendingInput(input); diagnosticLog?.add( 'socket.input.buffer', 'reason=no-session input=${_formatInputForDiagnostics(input)}', @@ -229,27 +237,37 @@ class TerminalSessionCoordinator extends ChangeNotifier { return; } - final result = socketSession.sendInput(input); + final pendingInput = _enqueuePendingInput(input); + if (pendingInput == null) { + diagnosticLog?.add('socket.input.skip', 'reason=empty-input'); + return; + } + + final result = socketSession.sendInput( + input, + inputId: pendingInput.inputId, + ); switch (result) { case TerminalSocketDispatchResult.sent: diagnosticLog?.add( 'socket.input.tx', - _formatInputForDiagnostics(input), + 'id=${pendingInput.inputId} ${_formatInputForDiagnostics(input)}', ); case TerminalSocketDispatchResult.noTransport: if (_shouldBufferInput(input)) { - _bufferInput(input); diagnosticLog?.add( 'socket.input.buffer', - 'reason=no-transport input=${_formatInputForDiagnostics(input)}', + 'reason=no-transport id=${pendingInput.inputId} input=${_formatInputForDiagnostics(input)}', ); } else { + _removePendingInputById(pendingInput.inputId); diagnosticLog?.add( 'socket.input.skip', - 'reason=no-transport input=${_formatInputForDiagnostics(input)}', + 'reason=no-transport id=${pendingInput.inputId} input=${_formatInputForDiagnostics(input)}', ); } case TerminalSocketDispatchResult.emptyInput: + _removePendingInputById(pendingInput.inputId); diagnosticLog?.add('socket.input.skip', 'reason=empty-input'); } } @@ -361,6 +379,12 @@ class TerminalSessionCoordinator extends ChangeNotifier { } } + void _handleInputAck(String inputId) { + if (_removePendingInputById(inputId)) { + diagnosticLog?.add('socket.input.ack', inputId); + } + } + Future loadRecentHistoryWindow() async { var history = controller.historyWindow.outputSeedText.isNotEmpty ? controller.historyWindow @@ -555,18 +579,23 @@ class TerminalSessionCoordinator extends ChangeNotifier { return _connectionAttemptInProgress || _reconnectPending; } - void _bufferInput(String input) { + _PendingInputDispatch? _enqueuePendingInput(String input) { if (input.isEmpty) { - return; + return null; } - _pendingInputs.add(input); + final dispatch = _PendingInputDispatch( + inputId: _buildPendingInputId(), + input: input, + ); + _pendingInputs.add(dispatch); _pendingInputCharacterCount += input.length; while (_pendingInputCharacterCount > pendingInputCharacterLimit && _pendingInputs.isNotEmpty) { final removed = _pendingInputs.removeAt(0); - _pendingInputCharacterCount -= removed.length; + _pendingInputCharacterCount -= removed.input.length; } + return dispatch; } void _flushPendingInputs(TerminalSocketSession socketSession) { @@ -574,31 +603,46 @@ class TerminalSessionCoordinator extends ChangeNotifier { return; } - final pendingInputs = List.of(_pendingInputs); - _pendingInputs.clear(); - _pendingInputCharacterCount = 0; - - for (var index = 0; index < pendingInputs.length; index += 1) { - final input = pendingInputs[index]; - final result = socketSession.sendInput(input); + for (final pendingInput in _pendingInputs) { + final result = socketSession.sendInput( + pendingInput.input, + inputId: pendingInput.inputId, + ); if (result == TerminalSocketDispatchResult.sent) { diagnosticLog?.add( 'socket.input.flush', - _formatInputForDiagnostics(input), + 'id=${pendingInput.inputId} ${_formatInputForDiagnostics(pendingInput.input)}', ); continue; } if (result == TerminalSocketDispatchResult.noTransport) { - for (final remainingInput in pendingInputs.skip(index)) { - _bufferInput(remainingInput); - } + diagnosticLog?.add( + 'socket.input.flush.pause', + pendingInput.inputId, + ); } break; } } + bool _removePendingInputById(String inputId) { + final index = _pendingInputs.indexWhere( + (pendingInput) => pendingInput.inputId == inputId, + ); + if (index < 0) { + return false; + } + + final removed = _pendingInputs.removeAt(index); + _pendingInputCharacterCount -= removed.input.length; + if (_pendingInputCharacterCount < 0) { + _pendingInputCharacterCount = 0; + } + return true; + } + void _flushPendingResize() { _cancelResize = null; @@ -750,6 +794,11 @@ class TerminalSessionCoordinator extends ChangeNotifier { static String _compactControlText(String payload) { return payload.replaceAll('\r', r'\r').replaceAll('\n', r'\n'); } + + String _buildPendingInputId() { + _nextInputId += 1; + return '${session.sessionId}-input-$_nextInputId'; + } } class _JournalItem { @@ -774,3 +823,13 @@ class _JournalItem { ); } } + +class _PendingInputDispatch { + const _PendingInputDispatch({ + required this.inputId, + required this.input, + }); + + final String inputId; + final String input; +} diff --git a/apps/mobile_app/lib/features/terminal/terminal_socket_session.dart b/apps/mobile_app/lib/features/terminal/terminal_socket_session.dart index 40aab3e..04d5087 100644 --- a/apps/mobile_app/lib/features/terminal/terminal_socket_session.dart +++ b/apps/mobile_app/lib/features/terminal/terminal_socket_session.dart @@ -59,6 +59,7 @@ class TerminalSocketSession { Future connect({ required void Function(TerminalOutputPayload output) onOutput, required void Function(TerminalRestorePayload restore) onRestore, + void Function(String inputId)? onInputAck, void Function()? onDisconnected, }) async { if (_transport != null || _subscription != null) { @@ -89,6 +90,10 @@ class TerminalSocketSession { return; } + if (_handleInputAckFrame(message, onInputAck)) { + return; + } + final output = _decodeOutputFrame(message); if (output != null) { onOutput(output); @@ -132,7 +137,7 @@ class TerminalSocketSession { } } - TerminalSocketDispatchResult sendInput(String input) { + TerminalSocketDispatchResult sendInput(String input, {String? inputId}) { final transport = _transport; if (input.isEmpty) { return TerminalSocketDispatchResult.emptyInput; @@ -143,7 +148,9 @@ class TerminalSocketSession { } try { - transport.send(jsonEncode(socketClient.buildInputMessage(input))); + transport.send( + jsonEncode(socketClient.buildInputMessage(input, inputId: inputId)), + ); return TerminalSocketDispatchResult.sent; } catch (_) { _handleTransportClosed(transport); @@ -208,6 +215,30 @@ class TerminalSocketSession { return false; } + bool _handleInputAckFrame( + String frame, + void Function(String inputId)? onInputAck, + ) { + if (onInputAck == null) { + return false; + } + + try { + final decoded = jsonDecode(frame); + if (decoded is Map && + decoded['type'] == 'inputAck' && + _matchesSessionId(decoded)) { + final inputId = decoded['inputId']; + if (inputId is String && inputId.isNotEmpty) { + onInputAck(inputId); + return true; + } + } + } catch (_) {} + + return false; + } + TerminalOutputPayload? _decodeOutputFrame(String frame) { try { final decoded = jsonDecode(frame); diff --git a/apps/mobile_app/test/core/network/agent_socket_client_test.dart b/apps/mobile_app/test/core/network/agent_socket_client_test.dart index a1a9625..f5d7f3f 100644 --- a/apps/mobile_app/test/core/network/agent_socket_client_test.dart +++ b/apps/mobile_app/test/core/network/agent_socket_client_test.dart @@ -23,9 +23,10 @@ void main() { test('builds input message for terminal input', () { final client = AgentSocketClient(Uri.parse('https://host:9443')); - expect(client.buildInputMessage('ls'), { + expect(client.buildInputMessage('ls', inputId: 'input-1'), { 'type': 'input', 'input': 'ls', + 'inputId': 'input-1', }); }); } diff --git a/apps/mobile_app/test/features/terminal/terminal_restore_decision_test.dart b/apps/mobile_app/test/features/terminal/terminal_restore_decision_test.dart index 7a692d1..172e359 100644 --- a/apps/mobile_app/test/features/terminal/terminal_restore_decision_test.dart +++ b/apps/mobile_app/test/features/terminal/terminal_restore_decision_test.dart @@ -2,13 +2,15 @@ import 'package:flutter_test/flutter_test.dart'; import 'package:term_remote_ctl/features/terminal/terminal_restore_decision.dart'; void main() { - test('keeps the local terminal content when restore is a shorter prefix', () { + test( + 'replaces the local terminal content when restore is a shorter authoritative prefix', + () { final decision = decideTerminalRestore( currentText: 'PS> git status\r\nmodified: file.txt\r\nPS> ', restoreText: 'PS> git status\r\n', ); - expect(decision, TerminalRestoreDecision.keepLocal); + expect(decision, TerminalRestoreDecision.replaceWithRestore); }); test('replaces local content when restore extends the current content', () { diff --git a/apps/mobile_app/test/features/terminal/terminal_session_coordinator_test.dart b/apps/mobile_app/test/features/terminal/terminal_session_coordinator_test.dart index 1046dda..2c088c2 100644 --- a/apps/mobile_app/test/features/terminal/terminal_session_coordinator_test.dart +++ b/apps/mobile_app/test/features/terminal/terminal_session_coordinator_test.dart @@ -6,6 +6,7 @@ import 'package:term_remote_ctl/core/network/agent_socket_client.dart'; import 'package:term_remote_ctl/features/sessions/session.dart'; import 'package:term_remote_ctl/features/terminal/terminal_interaction_controller.dart'; import 'package:term_remote_ctl/features/terminal/terminal_output_payload.dart'; +import 'package:term_remote_ctl/features/terminal/terminal_restore_decision.dart'; import 'package:term_remote_ctl/features/terminal/terminal_restore_payload.dart'; import 'package:term_remote_ctl/features/terminal/terminal_session_coordinator.dart'; import 'package:term_remote_ctl/features/terminal/terminal_socket_session.dart'; @@ -122,9 +123,64 @@ void main() { expect(sessionFactory.createdSessions, hasLength(2)); expect(sessionFactory.createdSessions.last.sentInputs, ['dir\r']); + expect(sessionFactory.createdSessions.last.sentInputIds, ['abc-input-1']); }, ); + test( + 'unacknowledged input is resent after reconnect and removed only after ack', + () async { + final controller = TerminalInteractionController(); + final apiClient = _FakeAgentApiClient(); + final sessionFactory = _FakeTerminalSessionFactory(); + final reconnectScheduler = _FakeReconnectScheduler(); + final session = Session( + sessionId: 'abc', + name: 'codex-main', + status: 'idle', + ); + final coordinator = TerminalSessionCoordinator( + controller: controller, + apiClient: apiClient, + session: session, + sessionFactory: sessionFactory.create, + onFrame: (_) {}, + onRestore: (_) {}, + viewportProvider: () => const TerminalViewport(columns: 80, rows: 24), + reconnectScheduler: reconnectScheduler.schedule, + ); + + await coordinator.start(); + final firstSession = sessionFactory.createdSessions.single; + + coordinator.sendInput('dir\r'); + expect(firstSession.sentInputIds, ['abc-input-1']); + + firstSession.disconnect(); + await reconnectScheduler.runPending(); + + final secondSession = sessionFactory.createdSessions.last; + expect(secondSession.sentInputs, ['dir\r']); + expect(secondSession.sentInputIds, ['abc-input-1']); + + secondSession.ackInput('abc-input-1'); + secondSession.disconnect(); + await reconnectScheduler.runPending(); + + final thirdSession = sessionFactory.createdSessions.last; + expect(thirdSession.sentInputs, isEmpty); + }, + ); + + test('restore payloads are treated as authoritative over provisional text', () { + final decision = decideTerminalRestore( + currentText: 'PS> git status\r\nmodified: file.txt\r\nPS> ', + restoreText: 'PS> git status\r\n', + ); + + expect(decision, TerminalRestoreDecision.replaceWithRestore); + }); + test( 'loadOlderHistory pages backward with beforeSequence instead of expanding lineCount', () async { @@ -920,12 +976,14 @@ class _FakeTerminalSocketSession extends TerminalSocketSession { final bool autoConnect; final resizeCalls = >[]; final sentInputs = []; + final sentInputIds = []; int disposeCount = 0; Completer? disposeCompleter; Completer _connectCompleter = Completer(); void Function(String frame)? _onFrame; void Function(TerminalOutputPayload output)? _onOutput; void Function()? _onDisconnected; + void Function(String inputId)? _onInputAck; bool _isDisconnected = false; void Function(TerminalRestorePayload restore)? _onRestore; @@ -933,10 +991,12 @@ class _FakeTerminalSocketSession extends TerminalSocketSession { Future connect({ required void Function(TerminalOutputPayload output) onOutput, required void Function(TerminalRestorePayload restore) onRestore, + void Function(String inputId)? onInputAck, void Function()? onDisconnected, }) { _onOutput = onOutput; _onRestore = onRestore; + _onInputAck = onInputAck; _onDisconnected = onDisconnected; if (autoConnect && !_connectCompleter.isCompleted) { _connectCompleter.complete(); @@ -950,12 +1010,15 @@ class _FakeTerminalSocketSession extends TerminalSocketSession { } @override - TerminalSocketDispatchResult sendInput(String input) { + TerminalSocketDispatchResult sendInput(String input, {String? inputId}) { if (_isDisconnected || !_connectCompleter.isCompleted) { return TerminalSocketDispatchResult.noTransport; } sentInputs.add(input); + if (inputId != null) { + sentInputIds.add(inputId); + } return TerminalSocketDispatchResult.sent; } @@ -978,6 +1041,10 @@ class _FakeTerminalSocketSession extends TerminalSocketSession { _onDisconnected?.call(); } + void ackInput(String inputId) { + _onInputAck?.call(inputId); + } + @override Future dispose() async { disposeCount += 1; diff --git a/apps/mobile_app/test/features/terminal/terminal_socket_session_test.dart b/apps/mobile_app/test/features/terminal/terminal_socket_session_test.dart index dd54ba5..c9a27ab 100644 --- a/apps/mobile_app/test/features/terminal/terminal_socket_session_test.dart +++ b/apps/mobile_app/test/features/terminal/terminal_socket_session_test.dart @@ -79,11 +79,11 @@ void main() { await Future.delayed(Duration.zero); transport.emit('{"type":"attached","sessionId":"session-123"}'); await connectFuture; - session.sendInput('dir\r'); + session.sendInput('dir\r', inputId: 'input-1'); expect( transport.sentMessages, - contains('{"type":"input","input":"dir\\r"}'), + contains('{"type":"input","input":"dir\\r","inputId":"input-1"}'), ); await session.dispose(); @@ -197,6 +197,33 @@ void main() { expect(outputs.single.chunk, 'live-output'); }); + test('connect routes input acknowledgements to onInputAck', () async { + final transport = _FakeTerminalSocketTransport(); + final session = TerminalSocketSession( + sessionId: 'session-123', + socketClient: AgentSocketClient(Uri.parse('https://host:9443')), + transportFactory: (_) => transport, + ); + + final acknowledgements = []; + final connectFuture = session.connect( + onOutput: (_) {}, + onRestore: (_) {}, + onInputAck: acknowledgements.add, + ); + await Future.delayed(Duration.zero); + + transport.emit('{"type":"attached","sessionId":"session-123"}'); + await connectFuture; + + transport.emit( + '{"type":"inputAck","sessionId":"session-123","inputId":"input-1"}', + ); + await Future.delayed(Duration.zero); + + expect(acknowledgements, ['input-1']); + }); + test('connect ignores unknown json control frames', () async { final transport = _FakeTerminalSocketTransport(); final session = TerminalSocketSession( diff --git a/apps/mobile_app/test/widget_test.dart b/apps/mobile_app/test/widget_test.dart index e4f4f5a..fe07adb 100644 --- a/apps/mobile_app/test/widget_test.dart +++ b/apps/mobile_app/test/widget_test.dart @@ -1326,7 +1326,7 @@ void main() { ); testWidgets( - 'terminal reconnect keeps a richer local snapshot when restore is shorter', + 'terminal reconnect replaces a provisional local snapshot with authoritative restore output', (tester) async { final snapshotStorage = _MemoryTerminalSnapshotStorage([ const TerminalSnapshot( @@ -1360,7 +1360,8 @@ void main() { final terminal = tester .widget(find.byType(TerminalView)) .terminal; - expect(terminal.buffer.getText(), contains('modified: file.txt')); + expect(terminal.buffer.getText(), isNot(contains('modified: file.txt'))); + expect(terminal.buffer.getText(), contains('PS> git status')); }, ); diff --git a/apps/windows_agent/src/TermRemoteCtl.Agent/History/SessionHistoryStore.cs b/apps/windows_agent/src/TermRemoteCtl.Agent/History/SessionHistoryStore.cs index 829aaa3..47c0860 100644 --- a/apps/windows_agent/src/TermRemoteCtl.Agent/History/SessionHistoryStore.cs +++ b/apps/windows_agent/src/TermRemoteCtl.Agent/History/SessionHistoryStore.cs @@ -1,10 +1,12 @@ using System.Text; +using System.Collections.Concurrent; namespace TermRemoteCtl.Agent.History; public sealed class SessionHistoryStore { private static readonly UTF8Encoding Utf8WithoutBom = new(false); + private readonly ConcurrentDictionary _writeGates = new(); private readonly string _historyRootPath; public SessionHistoryStore(string rootPath) @@ -19,30 +21,47 @@ public sealed class SessionHistoryStore ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); ArgumentNullException.ThrowIfNull(chunk); - var filePath = Path.Combine(_historyRootPath, $"{sessionId}.log"); - await using var stream = new FileStream( - filePath, - FileMode.Append, - FileAccess.Write, - FileShare.Read, - 4096, - FileOptions.Asynchronous); - await using var writer = new StreamWriter(stream, Utf8WithoutBom); - await writer.WriteAsync(chunk.AsMemory(), cancellationToken); - await writer.FlushAsync(cancellationToken); + var gate = _writeGates.GetOrAdd(sessionId, static _ => new SemaphoreSlim(1, 1)); + await gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + var filePath = Path.Combine(_historyRootPath, $"{sessionId}.log"); + await using var stream = new FileStream( + filePath, + FileMode.Append, + FileAccess.Write, + FileShare.Read, + 4096, + FileOptions.Asynchronous); + await using var writer = new StreamWriter(stream, Utf8WithoutBom); + await writer.WriteAsync(chunk.AsMemory(), cancellationToken).ConfigureAwait(false); + await writer.FlushAsync(cancellationToken).ConfigureAwait(false); + } + finally + { + gate.Release(); + } } - public Task DeleteAsync(string sessionId, CancellationToken cancellationToken) + public async Task DeleteAsync(string sessionId, CancellationToken cancellationToken) { ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); cancellationToken.ThrowIfCancellationRequested(); - var filePath = Path.Combine(_historyRootPath, $"{sessionId}.log"); - if (File.Exists(filePath)) + var gate = _writeGates.GetOrAdd(sessionId, static _ => new SemaphoreSlim(1, 1)); + await gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try { - File.Delete(filePath); + var filePath = Path.Combine(_historyRootPath, $"{sessionId}.log"); + if (File.Exists(filePath)) + { + File.Delete(filePath); + } + } + finally + { + gate.Release(); + _writeGates.TryRemove(sessionId, out _); } - - return Task.CompletedTask; } } diff --git a/apps/windows_agent/src/TermRemoteCtl.Agent/History/SessionIoJournalStore.cs b/apps/windows_agent/src/TermRemoteCtl.Agent/History/SessionIoJournalStore.cs index 7b89ce7..82d018f 100644 --- a/apps/windows_agent/src/TermRemoteCtl.Agent/History/SessionIoJournalStore.cs +++ b/apps/windows_agent/src/TermRemoteCtl.Agent/History/SessionIoJournalStore.cs @@ -1,5 +1,6 @@ using System.Text; using System.Text.Json; +using System.Collections.Concurrent; namespace TermRemoteCtl.Agent.History; @@ -11,6 +12,7 @@ public sealed class SessionIoJournalStore }; private static readonly UTF8Encoding Utf8WithoutBom = new(false); + private readonly ConcurrentDictionary _writeGates = new(); private readonly string _sessionRoot; public SessionIoJournalStore(string rootPath) @@ -25,9 +27,18 @@ public sealed class SessionIoJournalStore { ArgumentNullException.ThrowIfNull(ioEvent); - var filePath = Path.Combine(_sessionRoot, $"{ioEvent.SessionId}.io.jsonl"); - var line = JsonSerializer.Serialize(ioEvent, SerializerOptions) + Environment.NewLine; - await File.AppendAllTextAsync(filePath, line, Utf8WithoutBom, cancellationToken).ConfigureAwait(false); + var gate = _writeGates.GetOrAdd(ioEvent.SessionId, static _ => new SemaphoreSlim(1, 1)); + await gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + var filePath = Path.Combine(_sessionRoot, $"{ioEvent.SessionId}.io.jsonl"); + var line = JsonSerializer.Serialize(ioEvent, SerializerOptions) + Environment.NewLine; + await File.AppendAllTextAsync(filePath, line, Utf8WithoutBom, cancellationToken).ConfigureAwait(false); + } + finally + { + gate.Release(); + } } public async Task ReadAsync( @@ -89,18 +100,26 @@ public sealed class SessionIoJournalStore CurrentSequence: currentSequence); } - public Task DeleteAsync(string sessionId, CancellationToken cancellationToken) + public async Task DeleteAsync(string sessionId, CancellationToken cancellationToken) { ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); cancellationToken.ThrowIfCancellationRequested(); - var filePath = Path.Combine(_sessionRoot, $"{sessionId}.io.jsonl"); - if (File.Exists(filePath)) + var gate = _writeGates.GetOrAdd(sessionId, static _ => new SemaphoreSlim(1, 1)); + await gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try { - File.Delete(filePath); + var filePath = Path.Combine(_sessionRoot, $"{sessionId}.io.jsonl"); + if (File.Exists(filePath)) + { + File.Delete(filePath); + } + } + finally + { + gate.Release(); + _writeGates.TryRemove(sessionId, out _); } - - return Task.CompletedTask; } private async Task> ReadAllAsync(string sessionId, CancellationToken cancellationToken) diff --git a/apps/windows_agent/src/TermRemoteCtl.Agent/Realtime/TerminalWebSocketHandler.cs b/apps/windows_agent/src/TermRemoteCtl.Agent/Realtime/TerminalWebSocketHandler.cs index 36e1e32..e2db292 100644 --- a/apps/windows_agent/src/TermRemoteCtl.Agent/Realtime/TerminalWebSocketHandler.cs +++ b/apps/windows_agent/src/TermRemoteCtl.Agent/Realtime/TerminalWebSocketHandler.cs @@ -2,6 +2,7 @@ using System.Net.WebSockets; using System.Text; using System.Text.Json; using TermRemoteCtl.Agent.Sessions; +using TermRemoteCtl.Agent.Terminal.Screen; using TermRemoteCtl.Agent.Terminal; namespace TermRemoteCtl.Agent.Realtime; @@ -82,11 +83,19 @@ public static class TerminalWebSocketHandler restore.ScreenText, restore.PendingInput, restore.CursorRow, - restore.CursorColumn), + restore.CursorColumn, + MapScreenSnapshot(restore.ScreenSnapshot)), sendGate, context.RequestAborted).ConfigureAwait(false); host.OutputReceived += HandleOutput; - await ReceiveLoopAsync(context, socket, host, registry, diagnostics, sessionId).ConfigureAwait(false); + await ReceiveLoopAsync( + context, + socket, + host, + registry, + diagnostics, + sessionId, + sendGate).ConfigureAwait(false); } finally { @@ -107,7 +116,8 @@ public static class TerminalWebSocketHandler ISessionHost host, SessionRegistry registry, ITerminalDiagnosticsSink diagnostics, - string sessionId) + string sessionId, + SemaphoreSlim sendGate) { var buffer = new byte[4096]; @@ -135,20 +145,24 @@ public static class TerminalWebSocketHandler await HandleClientMessageAsync( Encoding.UTF8.GetString(message.ToArray()), + socket, registry, host, diagnostics, sessionId, + sendGate, context.RequestAborted).ConfigureAwait(false); } } private static async Task HandleClientMessageAsync( string payload, + WebSocket socket, SessionRegistry registry, ISessionHost host, ITerminalDiagnosticsSink diagnostics, string sessionId, + SemaphoreSlim sendGate, CancellationToken cancellationToken) { TerminalClientMessage? message; @@ -178,9 +192,47 @@ public static class TerminalWebSocketHandler { if (!string.IsNullOrEmpty(message.Input)) { - await registry.RecordInputAsync(sessionId, message.Input, cancellationToken).ConfigureAwait(false); - diagnostics.Record("backend.input.received", sessionId, SanitizeDiagnosticText(message.Input)); - await host.WriteInputAsync(sessionId, message.Input, cancellationToken).ConfigureAwait(false); + if (!string.IsNullOrWhiteSpace(message.InputId)) + { + if (!registry.TryBeginInputReceipt(sessionId, message.InputId, out var existingReceipt)) + { + if (existingReceipt is not null && await existingReceipt.ConfigureAwait(false)) + { + await SendJsonAsync( + socket, + new TerminalInputAckResponse(sessionId, message.InputId), + sendGate, + cancellationToken).ConfigureAwait(false); + } + + return; + } + } + + try + { + diagnostics.Record("backend.input.received", sessionId, SanitizeDiagnosticText(message.Input)); + await host.WriteInputAsync(sessionId, message.Input, cancellationToken).ConfigureAwait(false); + await registry.RecordInputAsync(sessionId, message.Input, cancellationToken).ConfigureAwait(false); + if (!string.IsNullOrWhiteSpace(message.InputId)) + { + registry.CompleteInputReceipt(sessionId, message.InputId, succeeded: true); + await SendJsonAsync( + socket, + new TerminalInputAckResponse(sessionId, message.InputId), + sendGate, + cancellationToken).ConfigureAwait(false); + } + } + catch + { + if (!string.IsNullOrWhiteSpace(message.InputId)) + { + registry.CompleteInputReceipt(sessionId, message.InputId, succeeded: false); + } + + throw; + } } return; @@ -202,6 +254,36 @@ public static class TerminalWebSocketHandler return input.Replace("\r", "\\r", StringComparison.Ordinal).Replace("\n", "\\n", StringComparison.Ordinal); } + private static TerminalScreenSnapshotResponse? MapScreenSnapshot( + TerminalScreenSnapshot? snapshot) + { + if (snapshot is null) + { + return null; + } + + return new TerminalScreenSnapshotResponse( + snapshot.ScreenVersion, + snapshot.SourceSequence, + snapshot.Rows, + snapshot.Columns, + snapshot.CursorRow, + snapshot.CursorColumn, + snapshot.CursorVisible, + snapshot.ActiveBuffer, + MapScreenBuffer(snapshot.PrimaryBuffer), + snapshot.AlternateBuffer is null ? null : MapScreenBuffer(snapshot.AlternateBuffer)); + } + + private static TerminalScreenBufferSnapshotResponse MapScreenBuffer( + TerminalScreenBufferSnapshot buffer) + { + return new TerminalScreenBufferSnapshotResponse( + buffer.Viewport + .Select(static line => new TerminalScreenLineSnapshotResponse(line.Index, line.Text)) + .ToArray()); + } + private static async Task SendJsonAsync( WebSocket socket, object response, @@ -244,6 +326,7 @@ public static class TerminalWebSocketHandler string PendingInput, int? CursorRow, int? CursorColumn, + TerminalScreenSnapshotResponse? ScreenSnapshot, string Type = "restore"); private sealed record TerminalOutputResponse( @@ -252,10 +335,35 @@ public static class TerminalWebSocketHandler string Chunk, string Type = "output"); + private sealed record TerminalInputAckResponse( + string SessionId, + string InputId, + string Type = "inputAck"); + + private sealed record TerminalScreenSnapshotResponse( + long ScreenVersion, + long SourceSequence, + int Rows, + int Columns, + int CursorRow, + int CursorColumn, + bool CursorVisible, + string ActiveBuffer, + TerminalScreenBufferSnapshotResponse PrimaryBuffer, + TerminalScreenBufferSnapshotResponse? AlternateBuffer); + + private sealed record TerminalScreenBufferSnapshotResponse( + IReadOnlyList Viewport); + + private sealed record TerminalScreenLineSnapshotResponse( + int Index, + string Text); + private sealed record TerminalClientMessage( string Type, string? SessionId, string? Input, + string? InputId, int? Columns, int? Rows); } diff --git a/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/SessionRegistry.cs b/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/SessionRegistry.cs index ea5f1a3..5445084 100644 --- a/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/SessionRegistry.cs +++ b/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/SessionRegistry.cs @@ -15,6 +15,7 @@ public sealed class SessionRegistry private readonly ConcurrentDictionary _historyBySession = new(); private readonly ConcurrentDictionary _replayBySession = new(); private readonly ConcurrentDictionary _pendingInputEchoBySession = new(); + private readonly ConcurrentDictionary _inputReceiptsBySession = new(); private readonly ConcurrentDictionary _screenBySession = new(); private readonly ConcurrentDictionary _sequenceBySession = new(); private readonly SessionHistoryStore _historyStore; @@ -54,6 +55,7 @@ public sealed class SessionRegistry _historyBySession[record.SessionId] = new TerminalRingBuffer(_ringBufferLineLimit); _replayBySession[record.SessionId] = new TerminalReplayBuffer(ReplayCharacterLimit); _pendingInputEchoBySession[record.SessionId] = new PendingInputEchoTracker(); + _inputReceiptsBySession[record.SessionId] = new TerminalInputReceiptTracker(); if (_enableBackendScreenProtocol) { _screenBySession[record.SessionId] = new TerminalScreenEngine(); @@ -302,6 +304,9 @@ public sealed class SessionRegistry sessionId, _ => new PendingInputEchoTracker()); var sequence = GetCurrentSequence(sessionId); + var screenSnapshot = _enableBackendScreenProtocol + ? _screenBySession.GetOrAdd(sessionId, _ => new TerminalScreenEngine()).CreateSnapshot(sessionId) + : null; return new SessionRestoreSnapshot( sessionId, @@ -309,7 +314,38 @@ public sealed class SessionRegistry replay.GetSnapshot(), pendingInputEcho.GetVisibleSuffix(), null, - null); + null, + screenSnapshot); + } + + public bool TryBeginInputReceipt( + string sessionId, + string inputId, + out Task? existingReceipt) + { + ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); + ArgumentException.ThrowIfNullOrWhiteSpace(inputId); + + if (!_records.ContainsKey(sessionId)) + { + throw new KeyNotFoundException($"Session '{sessionId}' was not found."); + } + + var tracker = _inputReceiptsBySession.GetOrAdd( + sessionId, + _ => new TerminalInputReceiptTracker()); + return tracker.TryBegin(inputId, out existingReceipt); + } + + public void CompleteInputReceipt(string sessionId, string inputId, bool succeeded) + { + ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); + ArgumentException.ThrowIfNullOrWhiteSpace(inputId); + + var tracker = _inputReceiptsBySession.GetOrAdd( + sessionId, + _ => new TerminalInputReceiptTracker()); + tracker.Complete(inputId, succeeded); } public TerminalScreenSnapshot GetScreenSnapshot(string sessionId) @@ -354,6 +390,7 @@ public sealed class SessionRegistry _historyBySession.TryRemove(sessionId, out _); _replayBySession.TryRemove(sessionId, out _); _pendingInputEchoBySession.TryRemove(sessionId, out _); + _inputReceiptsBySession.TryRemove(sessionId, out _); _screenBySession.TryRemove(sessionId, out _); _sequenceBySession.TryRemove(sessionId, out _); await _historyStore.DeleteAsync(sessionId, cancellationToken).ConfigureAwait(false); diff --git a/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/SessionRestoreSnapshot.cs b/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/SessionRestoreSnapshot.cs index 2a9331e..46a30aa 100644 --- a/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/SessionRestoreSnapshot.cs +++ b/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/SessionRestoreSnapshot.cs @@ -1,3 +1,5 @@ +using TermRemoteCtl.Agent.Terminal.Screen; + namespace TermRemoteCtl.Agent.Sessions; public sealed record SessionRestoreSnapshot( @@ -6,4 +8,5 @@ public sealed record SessionRestoreSnapshot( string ScreenText, string PendingInput, int? CursorRow, - int? CursorColumn); + int? CursorColumn, + TerminalScreenSnapshot? ScreenSnapshot); diff --git a/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/TerminalInputReceiptTracker.cs b/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/TerminalInputReceiptTracker.cs new file mode 100644 index 0000000..dd8c98a --- /dev/null +++ b/apps/windows_agent/src/TermRemoteCtl.Agent/Sessions/TerminalInputReceiptTracker.cs @@ -0,0 +1,45 @@ +using System.Collections.Concurrent; + +namespace TermRemoteCtl.Agent.Sessions; + +internal sealed class TerminalInputReceiptTracker +{ + private readonly ConcurrentDictionary> _receipts = new(); + + public bool TryBegin(string inputId, out Task? existingReceipt) + { + ArgumentException.ThrowIfNullOrWhiteSpace(inputId); + + var completionSource = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + if (_receipts.TryAdd(inputId, completionSource)) + { + existingReceipt = null; + return true; + } + + existingReceipt = _receipts[inputId].Task; + return false; + } + + public void Complete(string inputId, bool succeeded) + { + ArgumentException.ThrowIfNullOrWhiteSpace(inputId); + + if (!_receipts.TryGetValue(inputId, out var completionSource)) + { + return; + } + + if (succeeded) + { + completionSource.TrySetResult(true); + return; + } + + if (_receipts.TryRemove(inputId, out var removed)) + { + removed.TrySetResult(false); + } + } +} diff --git a/apps/windows_agent/src/TermRemoteCtl.Agent/Terminal/PowerShellSessionHost.cs b/apps/windows_agent/src/TermRemoteCtl.Agent/Terminal/PowerShellSessionHost.cs index e674c5e..2dedbc6 100644 --- a/apps/windows_agent/src/TermRemoteCtl.Agent/Terminal/PowerShellSessionHost.cs +++ b/apps/windows_agent/src/TermRemoteCtl.Agent/Terminal/PowerShellSessionHost.cs @@ -11,14 +11,8 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable private readonly IConPtySessionFactory _sessionFactory; private readonly SessionRegistry _sessionRegistry; private readonly ConcurrentDictionary _sessions = new(StringComparer.Ordinal); - private readonly Channel<(string SessionId, string Chunk)> _outputChannel = Channel.CreateUnbounded<(string SessionId, string Chunk)>( - new UnboundedChannelOptions - { - SingleReader = true, - SingleWriter = false, - }); + private readonly ConcurrentDictionary _outputProcessors = new(StringComparer.Ordinal); private readonly CancellationTokenSource _disposeCancellation = new(); - private readonly Task _outputPump; public PowerShellSessionHost( IConPtySessionFactory sessionFactory, @@ -26,7 +20,6 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable { _sessionFactory = sessionFactory; _sessionRegistry = sessionRegistry; - _outputPump = Task.Run(() => PumpOutputAsync(_disposeCancellation.Token)); } public event EventHandler? OutputReceived; @@ -62,6 +55,7 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable return; } + _outputProcessors.GetOrAdd(sessionId, CreateOutputProcessor); session.OutputReceived += HandleSessionOutput; try @@ -114,11 +108,11 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable session.OutputReceived -= HandleSessionOutput; await session.DisposeAsync().ConfigureAwait(false); + await DisposeOutputProcessorAsync(sessionId).ConfigureAwait(false); } public async ValueTask DisposeAsync() { - _outputChannel.Writer.TryComplete(); _disposeCancellation.Cancel(); foreach (var session in _sessions.Values) @@ -128,19 +122,21 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable } _sessions.Clear(); - try - { - await _outputPump.ConfigureAwait(false); - } - catch (OperationCanceledException) + + foreach (var sessionId in _outputProcessors.Keys) { + await DisposeOutputProcessorAsync(sessionId).ConfigureAwait(false); } + _disposeCancellation.Dispose(); } private void HandleSessionOutput(object? sender, TerminalOutputEventArgs args) { - _outputChannel.Writer.TryWrite((args.SessionId, args.Chunk)); + if (_outputProcessors.TryGetValue(args.SessionId, out var processor)) + { + processor.Channel.Writer.TryWrite(args.Chunk); + } } private async Task PublishOutputAsync(string sessionId, string chunk) @@ -152,11 +148,60 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable OutputReceived?.Invoke(this, new TerminalOutputEventArgs(sessionId, chunk, ioEvent.Sequence)); } - private async Task PumpOutputAsync(CancellationToken cancellationToken) + private SessionOutputProcessor CreateOutputProcessor(string sessionId) { - await foreach (var output in _outputChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + var channel = Channel.CreateUnbounded( + new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = false, + }); + var pumpTask = Task.Run( + () => PumpSessionOutputAsync(sessionId, channel.Reader, _disposeCancellation.Token), + CancellationToken.None); + return new SessionOutputProcessor(channel, pumpTask); + } + + private async Task DisposeOutputProcessorAsync(string sessionId) + { + if (!_outputProcessors.TryRemove(sessionId, out var processor)) + { + return; + } + + processor.Channel.Writer.TryComplete(); + try + { + await processor.PumpTask.ConfigureAwait(false); + } + catch (OperationCanceledException) { - await PublishOutputAsync(output.SessionId, output.Chunk).ConfigureAwait(false); } } + + private async Task PumpSessionOutputAsync( + string sessionId, + ChannelReader reader, + CancellationToken cancellationToken) + { + await foreach (var chunk in reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + { + try + { + await PublishOutputAsync(sessionId, chunk).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + return; + } + catch + { + // Keep one broken session from poisoning every other session's output path. + } + } + } + + private sealed record SessionOutputProcessor( + Channel Channel, + Task PumpTask); } diff --git a/apps/windows_agent/tests/TermRemoteCtl.Agent.IntegrationTests/Realtime/TerminalWebSocketHandlerTests.cs b/apps/windows_agent/tests/TermRemoteCtl.Agent.IntegrationTests/Realtime/TerminalWebSocketHandlerTests.cs index e0318e9..cb6de06 100644 --- a/apps/windows_agent/tests/TermRemoteCtl.Agent.IntegrationTests/Realtime/TerminalWebSocketHandlerTests.cs +++ b/apps/windows_agent/tests/TermRemoteCtl.Agent.IntegrationTests/Realtime/TerminalWebSocketHandlerTests.cs @@ -7,6 +7,8 @@ using Microsoft.AspNetCore.Mvc.Testing; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; +using TermRemoteCtl.Agent.Configuration; using TermRemoteCtl.Agent.Sessions; using TermRemoteCtl.Agent.Terminal; @@ -169,6 +171,7 @@ public sealed class TerminalWebSocketHandlerTests CancellationToken.None)) { _ = await ReceiveTextAsync(socket, CancellationToken.None); + _ = await ReceiveTextAsync(socket, CancellationToken.None); var inputMessage = JsonSerializer.Serialize(new { type = "input", input = "dir" }); await socket.SendAsync(Encoding.UTF8.GetBytes(inputMessage), WebSocketMessageType.Text, true, CancellationToken.None); @@ -215,6 +218,70 @@ public sealed class TerminalWebSocketHandlerTests Assert.Contains("\"sequence\":2", restoreFrame); } + [Fact] + [Trait("Track", "Mainline")] + public async Task Input_Acknowledgement_Deduplicates_Replayed_Client_Input() + { + await using var fixture = new TerminalApiFixture(); + var registry = fixture.Services.GetRequiredService(); + fixture.TerminalHost.Registry = registry; + var session = registry.Create("Shell", DateTimeOffset.UtcNow); + + using WebSocket firstSocket = await fixture.Server.CreateWebSocketClient().ConnectAsync( + new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"), + CancellationToken.None); + + _ = await ReceiveTextAsync(firstSocket, CancellationToken.None); + _ = await ReceiveTextAsync(firstSocket, CancellationToken.None); + + var inputMessage = JsonSerializer.Serialize(new { type = "input", input = "dir", inputId = "input-1" }); + await firstSocket.SendAsync(Encoding.UTF8.GetBytes(inputMessage), WebSocketMessageType.Text, true, CancellationToken.None); + + var firstAckFrame = await ReceiveTextAsync(firstSocket, CancellationToken.None); + Assert.Contains("\"type\":\"inputAck\"", firstAckFrame); + Assert.Contains("\"inputId\":\"input-1\"", firstAckFrame); + + using WebSocket secondSocket = await fixture.Server.CreateWebSocketClient().ConnectAsync( + new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"), + CancellationToken.None); + + _ = await ReceiveTextAsync(secondSocket, CancellationToken.None); + _ = await ReceiveTextAsync(secondSocket, CancellationToken.None); + await secondSocket.SendAsync(Encoding.UTF8.GetBytes(inputMessage), WebSocketMessageType.Text, true, CancellationToken.None); + + var duplicateAckFrame = await ReceiveTextAsync(secondSocket, CancellationToken.None); + Assert.Contains("\"type\":\"inputAck\"", duplicateAckFrame); + Assert.Contains("\"inputId\":\"input-1\"", duplicateAckFrame); + Assert.Single(fixture.TerminalHost.Inputs.Where(item => item == ("input", session.SessionId, "dir"))); + } + + [Fact] + [Trait("Track", "Mainline")] + public async Task Attach_Includes_Screen_Snapshot_When_Backend_Screen_Protocol_Is_Enabled() + { + await using var fixture = new TerminalApiFixture(enableBackendScreenProtocol: true); + var registry = fixture.Services.GetRequiredService(); + fixture.TerminalHost.Registry = registry; + var session = registry.Create("Shell", DateTimeOffset.UtcNow); + await registry.RecordResizeAsync(session.SessionId, 40, 10, CancellationToken.None); + await registry.RecordOutputAsync(session.SessionId, "\u001b[2J\u001b[Hprompt> dir", CancellationToken.None); + + using WebSocket socket = await fixture.Server.CreateWebSocketClient().ConnectAsync( + new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"), + CancellationToken.None); + + _ = await ReceiveTextAsync(socket, CancellationToken.None); + var restoreFrame = await ReceiveTextAsync(socket, CancellationToken.None); + var restorePayload = JsonSerializer.Deserialize( + restoreFrame, + new JsonSerializerOptions(JsonSerializerDefaults.Web)); + + Assert.NotNull(restorePayload); + Assert.NotNull(restorePayload!.ScreenSnapshot); + Assert.Equal("primary", restorePayload.ScreenSnapshot!.ActiveBuffer); + Assert.Equal(2L, restorePayload.ScreenSnapshot.SourceSequence); + } + private static async Task ReceiveTextAsync(WebSocket socket, CancellationToken cancellationToken) { var buffer = new byte[4096]; @@ -264,10 +331,12 @@ public sealed class TerminalWebSocketHandlerTests private readonly string _dataRoot = Path.Combine(Path.GetTempPath(), "TermRemoteCtl.Tests", Guid.NewGuid().ToString("N")); private readonly TestTerminalSessionHost _terminalHost; private readonly RecordingTerminalDiagnosticsSink _diagnostics = new(); + private readonly bool _enableBackendScreenProtocol; - public TerminalApiFixture() + public TerminalApiFixture(bool enableBackendScreenProtocol = false) { _terminalHost = new TestTerminalSessionHost(); + _enableBackendScreenProtocol = enableBackendScreenProtocol; } public TestTerminalSessionHost TerminalHost => _terminalHost; @@ -285,11 +354,21 @@ public sealed class TerminalWebSocketHandlerTests ["Agent:BindAddress"] = "127.0.0.1", ["Agent:HttpsPort"] = "9443", ["Agent:WebSocketFrameFlushMilliseconds"] = "33", - ["Agent:RingBufferLineLimit"] = "4000" + ["Agent:RingBufferLineLimit"] = "4000", + ["Agent:EnableBackendScreenProtocol"] = _enableBackendScreenProtocol.ToString() }); }); builder.ConfigureServices(services => { + services.PostConfigure(options => + { + options.DataRoot = _dataRoot; + options.BindAddress = "127.0.0.1"; + options.HttpsPort = 9443; + options.WebSocketFrameFlushMilliseconds = 33; + options.RingBufferLineLimit = 4000; + options.EnableBackendScreenProtocol = _enableBackendScreenProtocol; + }); services.RemoveAll(); services.AddSingleton(_terminalHost); services.AddSingleton(sp => sp.GetRequiredService()); @@ -300,9 +379,26 @@ public sealed class TerminalWebSocketHandlerTests public new async ValueTask DisposeAsync() { await base.DisposeAsync(); - if (Directory.Exists(_dataRoot)) + if (!Directory.Exists(_dataRoot)) { - Directory.Delete(_dataRoot, true); + return; + } + + for (var attempt = 0; attempt < 5; attempt += 1) + { + try + { + Directory.Delete(_dataRoot, true); + return; + } + catch (IOException) when (attempt < 4) + { + await Task.Delay(50); + } + catch (UnauthorizedAccessException) when (attempt < 4) + { + await Task.Delay(50); + } } } } @@ -405,8 +501,14 @@ public sealed class TerminalWebSocketHandlerTests string PendingInput, int? CursorRow, int? CursorColumn, + TerminalScreenSnapshotResponse? ScreenSnapshot, string Type); + private sealed record TerminalScreenSnapshotResponse( + long ScreenVersion, + long SourceSequence, + string ActiveBuffer); + private sealed record TerminalOutputResponse( string SessionId, long Sequence, diff --git a/apps/windows_agent/tests/TermRemoteCtl.Agent.Tests/Terminal/PowerShellSessionHostTests.cs b/apps/windows_agent/tests/TermRemoteCtl.Agent.Tests/Terminal/PowerShellSessionHostTests.cs index cb5d33e..c74b27b 100644 --- a/apps/windows_agent/tests/TermRemoteCtl.Agent.Tests/Terminal/PowerShellSessionHostTests.cs +++ b/apps/windows_agent/tests/TermRemoteCtl.Agent.Tests/Terminal/PowerShellSessionHostTests.cs @@ -110,6 +110,53 @@ public class PowerShellSessionHostTests Assert.True(factory.CreatedSessions.Last().StartCount > 0); } + [Fact] + public async Task Output_Failure_From_One_Session_Does_Not_Block_Other_Sessions() + { + var factory = new FakeConPtySessionFactory(); + using var harness = HostHarness.Create(factory); + await using var host = harness.Host; + var blockedSession = harness.Registry.Create("blocked", DateTimeOffset.UtcNow); + var healthySession = harness.Registry.Create("healthy", DateTimeOffset.UtcNow); + + await host.StartAsync(blockedSession.SessionId, CancellationToken.None); + await host.StartAsync(healthySession.SessionId, CancellationToken.None); + + var blockedLogPath = Path.Combine( + harness.DataRoot, + "sessions", + $"{blockedSession.SessionId}.log"); + Directory.CreateDirectory(Path.GetDirectoryName(blockedLogPath)!); + await using var blockingStream = new FileStream( + blockedLogPath, + FileMode.OpenOrCreate, + FileAccess.ReadWrite, + FileShare.None); + + using var received = new ManualResetEventSlim(false); + var outputs = new List(); + host.OutputReceived += (_, args) => + { + lock (outputs) + { + outputs.Add(args); + if (args.SessionId == healthySession.SessionId) + { + received.Set(); + } + } + }; + + factory.CreatedSessions[0].EmitOutput(blockedSession.SessionId, "blocked-output"); + factory.CreatedSessions[1].EmitOutput(healthySession.SessionId, "healthy-output"); + + Assert.True(received.Wait(TimeSpan.FromSeconds(2))); + Assert.Contains( + outputs, + item => item.SessionId == healthySession.SessionId && + item.Chunk == "healthy-output"); + } + private sealed class HostHarness : IDisposable { private HostHarness(string dataRoot, SessionRegistry registry, PowerShellSessionHost host) @@ -148,7 +195,22 @@ public class PowerShellSessionHostTests { if (Directory.Exists(DataRoot)) { - Directory.Delete(DataRoot, true); + for (var attempt = 0; attempt < 5; attempt += 1) + { + try + { + Directory.Delete(DataRoot, true); + break; + } + catch (IOException) when (attempt < 4) + { + Thread.Sleep(50); + } + catch (UnauthorizedAccessException) when (attempt < 4) + { + Thread.Sleep(50); + } + } } } }