fix terminal recovery and isolate output pumps

This commit is contained in:
sladro 2026-04-11 07:57:56 +08:00
parent 2b2523d65f
commit 3d06ee0a19
21 changed files with 742 additions and 138 deletions

1
.gitignore vendored
View File

@ -50,6 +50,7 @@ Thumbs.db
data/ data/
certs/ certs/
.superpowers/ .superpowers/
.codex/
.worktrees/ .worktrees/
work/*.log work/*.log
work/dotnet-*/ work/dotnet-*/

View File

@ -16,10 +16,12 @@ class AgentSocketClient {
Map<String, dynamic> buildAttachMessage(String sessionId) => Map<String, dynamic> buildAttachMessage(String sessionId) =>
<String, dynamic>{'type': 'attach', 'sessionId': sessionId}; <String, dynamic>{'type': 'attach', 'sessionId': sessionId};
Map<String, dynamic> buildInputMessage(String input) => <String, dynamic>{ Map<String, dynamic> buildInputMessage(String input, {String? inputId}) =>
'type': 'input', <String, dynamic>{
'input': input, 'type': 'input',
}; 'input': input,
if (inputId != null && inputId.isNotEmpty) 'inputId': inputId,
};
Map<String, dynamic> buildResizeMessage(int columns, int rows) => Map<String, dynamic> buildResizeMessage(int columns, int rows) =>
<String, dynamic>{'type': 'resize', 'columns': columns, 'rows': rows}; <String, dynamic>{'type': 'resize', 'columns': columns, 'rows': rows};

View File

@ -21,7 +21,6 @@ import 'history_window.dart';
import 'repeatable_terminal_key_button.dart'; import 'repeatable_terminal_key_button.dart';
import 'terminal_interaction_controller.dart'; import 'terminal_interaction_controller.dart';
import 'terminal_restore_payload.dart'; import 'terminal_restore_payload.dart';
import 'terminal_restore_decision.dart';
import 'terminal_session_coordinator.dart'; import 'terminal_session_coordinator.dart';
import 'terminal_snapshot.dart'; import 'terminal_snapshot.dart';
import 'terminal_snapshot_storage.dart'; import 'terminal_snapshot_storage.dart';
@ -460,19 +459,10 @@ class _TerminalPageState extends ConsumerState<TerminalPage>
_receivedRestorePayload = true; _receivedRestorePayload = true;
_awaitingReconnectRestore = false; _awaitingReconnectRestore = false;
_cancelHistorySeedTimer(); _cancelHistorySeedTimer();
final combined = restore.screenText + restore.pendingInput; final restoreFrame = restore.buildReplayFrame();
if (combined.isEmpty) { _resetTerminalForReplay();
_scheduleSnapshotPersist(); if (restoreFrame.isNotEmpty) {
return; terminal.write(restoreFrame);
}
final decision = decideTerminalRestore(
currentText: terminal.buffer.getText(),
restoreText: combined,
);
if (decision == TerminalRestoreDecision.replaceWithRestore) {
_resetTerminalForReplay();
terminal.write(combined);
} }
_historySeeded = _terminalHasVisibleContent; _historySeeded = _terminalHasVisibleContent;
_scheduleSnapshotPersist(); _scheduleSnapshotPersist();
@ -568,10 +558,6 @@ class _TerminalPageState extends ConsumerState<TerminalPage>
terminal.buffer.getText().trim().isNotEmpty; terminal.buffer.getText().trim().isNotEmpty;
void _resetTerminalForReplay() { void _resetTerminalForReplay() {
if (!_terminalHasVisibleContent) {
return;
}
terminal.buffer.clear(); terminal.buffer.clear();
terminal.buffer.setCursor(0, 0); terminal.buffer.setCursor(0, 0);
terminal.notifyListeners(); terminal.notifyListeners();

View File

@ -4,32 +4,5 @@ TerminalRestoreDecision decideTerminalRestore({
required String currentText, required String currentText,
required String restoreText, required String restoreText,
}) { }) {
if (restoreText.isEmpty) {
return TerminalRestoreDecision.keepLocal;
}
if (currentText.isEmpty) {
return TerminalRestoreDecision.replaceWithRestore;
}
final normalizedCurrent = _normalizeTerminalText(currentText);
final normalizedRestore = _normalizeTerminalText(restoreText);
if (normalizedCurrent.isEmpty) {
return TerminalRestoreDecision.replaceWithRestore;
}
if (normalizedCurrent == normalizedRestore) {
return TerminalRestoreDecision.keepLocal;
}
if (normalizedCurrent.startsWith(normalizedRestore)) {
return TerminalRestoreDecision.keepLocal;
}
return TerminalRestoreDecision.replaceWithRestore; return TerminalRestoreDecision.replaceWithRestore;
} }
String _normalizeTerminalText(String text) {
return text.replaceAll('\r\n', '\n').replaceAll('\r', '\n');
}

View File

@ -1,15 +1,24 @@
import 'terminal_screen_snapshot.dart';
class TerminalRestorePayload { class TerminalRestorePayload {
const TerminalRestorePayload({ const TerminalRestorePayload({
required this.sessionId, required this.sessionId,
required this.sequence, required this.sequence,
required this.screenText, required this.screenText,
required this.pendingInput, required this.pendingInput,
this.screenSnapshot,
}); });
final String sessionId; final String sessionId;
final int sequence; final int sequence;
final String screenText; final String screenText;
final String pendingInput; final String pendingInput;
final TerminalScreenSnapshot? screenSnapshot;
String buildReplayFrame() {
final screenReplay = screenSnapshot?.toReplaySequence() ?? screenText;
return '$screenReplay$pendingInput';
}
factory TerminalRestorePayload.fromJson(Map<String, dynamic> json) { factory TerminalRestorePayload.fromJson(Map<String, dynamic> json) {
return TerminalRestorePayload( return TerminalRestorePayload(
@ -17,6 +26,11 @@ class TerminalRestorePayload {
sequence: json['sequence'] as int, sequence: json['sequence'] as int,
screenText: (json['screenText'] as String?) ?? '', screenText: (json['screenText'] as String?) ?? '',
pendingInput: (json['pendingInput'] as String?) ?? '', pendingInput: (json['pendingInput'] as String?) ?? '',
screenSnapshot: json['screenSnapshot'] is Map
? TerminalScreenSnapshot.fromJson(
Map<String, dynamic>.from(json['screenSnapshot'] as Map),
)
: null,
); );
} }
} }

View File

@ -80,7 +80,7 @@ class TerminalSessionCoordinator extends ChangeNotifier {
bool _connectionAttemptInProgress = false; bool _connectionAttemptInProgress = false;
bool _reconnectPending = false; bool _reconnectPending = false;
String _connectionStatus = 'Connecting...'; String _connectionStatus = 'Connecting...';
final List<String> _pendingInputs = <String>[]; final List<_PendingInputDispatch> _pendingInputs = <_PendingInputDispatch>[];
int _pendingInputCharacterCount = 0; int _pendingInputCharacterCount = 0;
int _sessionGeneration = 0; int _sessionGeneration = 0;
int? _pendingResizeColumns; int? _pendingResizeColumns;
@ -90,6 +90,7 @@ class TerminalSessionCoordinator extends ChangeNotifier {
int? _lastReceivedSequence; int? _lastReceivedSequence;
int? _recoveryGapBaselineSequence; int? _recoveryGapBaselineSequence;
bool _isBackendResizeEnabled = true; bool _isBackendResizeEnabled = true;
int _nextInputId = 0;
bool get isLoadingOlderHistory => _isLoadingOlderHistory; bool get isLoadingOlderHistory => _isLoadingOlderHistory;
@ -161,6 +162,13 @@ class TerminalSessionCoordinator extends ChangeNotifier {
_handleRestore(restore); _handleRestore(restore);
}, },
onInputAck: (inputId) {
if (!_isCurrentSession(socketSession, sessionGeneration)) {
return;
}
_handleInputAck(inputId);
},
onDisconnected: () { onDisconnected: () {
if (!_isCurrentSession(socketSession, sessionGeneration)) { if (!_isCurrentSession(socketSession, sessionGeneration)) {
return; return;
@ -214,7 +222,7 @@ class TerminalSessionCoordinator extends ChangeNotifier {
final socketSession = _socketSession; final socketSession = _socketSession;
if (socketSession == null) { if (socketSession == null) {
if (_shouldBufferInput(input)) { if (_shouldBufferInput(input)) {
_bufferInput(input); _enqueuePendingInput(input);
diagnosticLog?.add( diagnosticLog?.add(
'socket.input.buffer', 'socket.input.buffer',
'reason=no-session input=${_formatInputForDiagnostics(input)}', 'reason=no-session input=${_formatInputForDiagnostics(input)}',
@ -229,27 +237,37 @@ class TerminalSessionCoordinator extends ChangeNotifier {
return; return;
} }
final result = socketSession.sendInput(input); final pendingInput = _enqueuePendingInput(input);
if (pendingInput == null) {
diagnosticLog?.add('socket.input.skip', 'reason=empty-input');
return;
}
final result = socketSession.sendInput(
input,
inputId: pendingInput.inputId,
);
switch (result) { switch (result) {
case TerminalSocketDispatchResult.sent: case TerminalSocketDispatchResult.sent:
diagnosticLog?.add( diagnosticLog?.add(
'socket.input.tx', 'socket.input.tx',
_formatInputForDiagnostics(input), 'id=${pendingInput.inputId} ${_formatInputForDiagnostics(input)}',
); );
case TerminalSocketDispatchResult.noTransport: case TerminalSocketDispatchResult.noTransport:
if (_shouldBufferInput(input)) { if (_shouldBufferInput(input)) {
_bufferInput(input);
diagnosticLog?.add( diagnosticLog?.add(
'socket.input.buffer', 'socket.input.buffer',
'reason=no-transport input=${_formatInputForDiagnostics(input)}', 'reason=no-transport id=${pendingInput.inputId} input=${_formatInputForDiagnostics(input)}',
); );
} else { } else {
_removePendingInputById(pendingInput.inputId);
diagnosticLog?.add( diagnosticLog?.add(
'socket.input.skip', 'socket.input.skip',
'reason=no-transport input=${_formatInputForDiagnostics(input)}', 'reason=no-transport id=${pendingInput.inputId} input=${_formatInputForDiagnostics(input)}',
); );
} }
case TerminalSocketDispatchResult.emptyInput: case TerminalSocketDispatchResult.emptyInput:
_removePendingInputById(pendingInput.inputId);
diagnosticLog?.add('socket.input.skip', 'reason=empty-input'); diagnosticLog?.add('socket.input.skip', 'reason=empty-input');
} }
} }
@ -361,6 +379,12 @@ class TerminalSessionCoordinator extends ChangeNotifier {
} }
} }
void _handleInputAck(String inputId) {
if (_removePendingInputById(inputId)) {
diagnosticLog?.add('socket.input.ack', inputId);
}
}
Future<HistoryWindow?> loadRecentHistoryWindow() async { Future<HistoryWindow?> loadRecentHistoryWindow() async {
var history = controller.historyWindow.outputSeedText.isNotEmpty var history = controller.historyWindow.outputSeedText.isNotEmpty
? controller.historyWindow ? controller.historyWindow
@ -555,18 +579,23 @@ class TerminalSessionCoordinator extends ChangeNotifier {
return _connectionAttemptInProgress || _reconnectPending; return _connectionAttemptInProgress || _reconnectPending;
} }
void _bufferInput(String input) { _PendingInputDispatch? _enqueuePendingInput(String input) {
if (input.isEmpty) { if (input.isEmpty) {
return; return null;
} }
_pendingInputs.add(input); final dispatch = _PendingInputDispatch(
inputId: _buildPendingInputId(),
input: input,
);
_pendingInputs.add(dispatch);
_pendingInputCharacterCount += input.length; _pendingInputCharacterCount += input.length;
while (_pendingInputCharacterCount > pendingInputCharacterLimit && while (_pendingInputCharacterCount > pendingInputCharacterLimit &&
_pendingInputs.isNotEmpty) { _pendingInputs.isNotEmpty) {
final removed = _pendingInputs.removeAt(0); final removed = _pendingInputs.removeAt(0);
_pendingInputCharacterCount -= removed.length; _pendingInputCharacterCount -= removed.input.length;
} }
return dispatch;
} }
void _flushPendingInputs(TerminalSocketSession socketSession) { void _flushPendingInputs(TerminalSocketSession socketSession) {
@ -574,31 +603,46 @@ class TerminalSessionCoordinator extends ChangeNotifier {
return; return;
} }
final pendingInputs = List<String>.of(_pendingInputs); for (final pendingInput in _pendingInputs) {
_pendingInputs.clear(); final result = socketSession.sendInput(
_pendingInputCharacterCount = 0; pendingInput.input,
inputId: pendingInput.inputId,
for (var index = 0; index < pendingInputs.length; index += 1) { );
final input = pendingInputs[index];
final result = socketSession.sendInput(input);
if (result == TerminalSocketDispatchResult.sent) { if (result == TerminalSocketDispatchResult.sent) {
diagnosticLog?.add( diagnosticLog?.add(
'socket.input.flush', 'socket.input.flush',
_formatInputForDiagnostics(input), 'id=${pendingInput.inputId} ${_formatInputForDiagnostics(pendingInput.input)}',
); );
continue; continue;
} }
if (result == TerminalSocketDispatchResult.noTransport) { if (result == TerminalSocketDispatchResult.noTransport) {
for (final remainingInput in pendingInputs.skip(index)) { diagnosticLog?.add(
_bufferInput(remainingInput); 'socket.input.flush.pause',
} pendingInput.inputId,
);
} }
break; break;
} }
} }
bool _removePendingInputById(String inputId) {
final index = _pendingInputs.indexWhere(
(pendingInput) => pendingInput.inputId == inputId,
);
if (index < 0) {
return false;
}
final removed = _pendingInputs.removeAt(index);
_pendingInputCharacterCount -= removed.input.length;
if (_pendingInputCharacterCount < 0) {
_pendingInputCharacterCount = 0;
}
return true;
}
void _flushPendingResize() { void _flushPendingResize() {
_cancelResize = null; _cancelResize = null;
@ -750,6 +794,11 @@ class TerminalSessionCoordinator extends ChangeNotifier {
static String _compactControlText(String payload) { static String _compactControlText(String payload) {
return payload.replaceAll('\r', r'\r').replaceAll('\n', r'\n'); return payload.replaceAll('\r', r'\r').replaceAll('\n', r'\n');
} }
String _buildPendingInputId() {
_nextInputId += 1;
return '${session.sessionId}-input-$_nextInputId';
}
} }
class _JournalItem { class _JournalItem {
@ -774,3 +823,13 @@ class _JournalItem {
); );
} }
} }
class _PendingInputDispatch {
const _PendingInputDispatch({
required this.inputId,
required this.input,
});
final String inputId;
final String input;
}

View File

@ -59,6 +59,7 @@ class TerminalSocketSession {
Future<void> connect({ Future<void> connect({
required void Function(TerminalOutputPayload output) onOutput, required void Function(TerminalOutputPayload output) onOutput,
required void Function(TerminalRestorePayload restore) onRestore, required void Function(TerminalRestorePayload restore) onRestore,
void Function(String inputId)? onInputAck,
void Function()? onDisconnected, void Function()? onDisconnected,
}) async { }) async {
if (_transport != null || _subscription != null) { if (_transport != null || _subscription != null) {
@ -89,6 +90,10 @@ class TerminalSocketSession {
return; return;
} }
if (_handleInputAckFrame(message, onInputAck)) {
return;
}
final output = _decodeOutputFrame(message); final output = _decodeOutputFrame(message);
if (output != null) { if (output != null) {
onOutput(output); onOutput(output);
@ -132,7 +137,7 @@ class TerminalSocketSession {
} }
} }
TerminalSocketDispatchResult sendInput(String input) { TerminalSocketDispatchResult sendInput(String input, {String? inputId}) {
final transport = _transport; final transport = _transport;
if (input.isEmpty) { if (input.isEmpty) {
return TerminalSocketDispatchResult.emptyInput; return TerminalSocketDispatchResult.emptyInput;
@ -143,7 +148,9 @@ class TerminalSocketSession {
} }
try { try {
transport.send(jsonEncode(socketClient.buildInputMessage(input))); transport.send(
jsonEncode(socketClient.buildInputMessage(input, inputId: inputId)),
);
return TerminalSocketDispatchResult.sent; return TerminalSocketDispatchResult.sent;
} catch (_) { } catch (_) {
_handleTransportClosed(transport); _handleTransportClosed(transport);
@ -208,6 +215,30 @@ class TerminalSocketSession {
return false; return false;
} }
bool _handleInputAckFrame(
String frame,
void Function(String inputId)? onInputAck,
) {
if (onInputAck == null) {
return false;
}
try {
final decoded = jsonDecode(frame);
if (decoded is Map &&
decoded['type'] == 'inputAck' &&
_matchesSessionId(decoded)) {
final inputId = decoded['inputId'];
if (inputId is String && inputId.isNotEmpty) {
onInputAck(inputId);
return true;
}
}
} catch (_) {}
return false;
}
TerminalOutputPayload? _decodeOutputFrame(String frame) { TerminalOutputPayload? _decodeOutputFrame(String frame) {
try { try {
final decoded = jsonDecode(frame); final decoded = jsonDecode(frame);

View File

@ -23,9 +23,10 @@ void main() {
test('builds input message for terminal input', () { test('builds input message for terminal input', () {
final client = AgentSocketClient(Uri.parse('https://host:9443')); final client = AgentSocketClient(Uri.parse('https://host:9443'));
expect(client.buildInputMessage('ls'), <String, dynamic>{ expect(client.buildInputMessage('ls', inputId: 'input-1'), <String, dynamic>{
'type': 'input', 'type': 'input',
'input': 'ls', 'input': 'ls',
'inputId': 'input-1',
}); });
}); });
} }

View File

@ -2,13 +2,15 @@ import 'package:flutter_test/flutter_test.dart';
import 'package:term_remote_ctl/features/terminal/terminal_restore_decision.dart'; import 'package:term_remote_ctl/features/terminal/terminal_restore_decision.dart';
void main() { void main() {
test('keeps the local terminal content when restore is a shorter prefix', () { test(
'replaces the local terminal content when restore is a shorter authoritative prefix',
() {
final decision = decideTerminalRestore( final decision = decideTerminalRestore(
currentText: 'PS> git status\r\nmodified: file.txt\r\nPS> ', currentText: 'PS> git status\r\nmodified: file.txt\r\nPS> ',
restoreText: 'PS> git status\r\n', restoreText: 'PS> git status\r\n',
); );
expect(decision, TerminalRestoreDecision.keepLocal); expect(decision, TerminalRestoreDecision.replaceWithRestore);
}); });
test('replaces local content when restore extends the current content', () { test('replaces local content when restore extends the current content', () {

View File

@ -6,6 +6,7 @@ import 'package:term_remote_ctl/core/network/agent_socket_client.dart';
import 'package:term_remote_ctl/features/sessions/session.dart'; import 'package:term_remote_ctl/features/sessions/session.dart';
import 'package:term_remote_ctl/features/terminal/terminal_interaction_controller.dart'; import 'package:term_remote_ctl/features/terminal/terminal_interaction_controller.dart';
import 'package:term_remote_ctl/features/terminal/terminal_output_payload.dart'; import 'package:term_remote_ctl/features/terminal/terminal_output_payload.dart';
import 'package:term_remote_ctl/features/terminal/terminal_restore_decision.dart';
import 'package:term_remote_ctl/features/terminal/terminal_restore_payload.dart'; import 'package:term_remote_ctl/features/terminal/terminal_restore_payload.dart';
import 'package:term_remote_ctl/features/terminal/terminal_session_coordinator.dart'; import 'package:term_remote_ctl/features/terminal/terminal_session_coordinator.dart';
import 'package:term_remote_ctl/features/terminal/terminal_socket_session.dart'; import 'package:term_remote_ctl/features/terminal/terminal_socket_session.dart';
@ -122,9 +123,64 @@ void main() {
expect(sessionFactory.createdSessions, hasLength(2)); expect(sessionFactory.createdSessions, hasLength(2));
expect(sessionFactory.createdSessions.last.sentInputs, ['dir\r']); expect(sessionFactory.createdSessions.last.sentInputs, ['dir\r']);
expect(sessionFactory.createdSessions.last.sentInputIds, ['abc-input-1']);
}, },
); );
test(
'unacknowledged input is resent after reconnect and removed only after ack',
() async {
final controller = TerminalInteractionController();
final apiClient = _FakeAgentApiClient();
final sessionFactory = _FakeTerminalSessionFactory();
final reconnectScheduler = _FakeReconnectScheduler();
final session = Session(
sessionId: 'abc',
name: 'codex-main',
status: 'idle',
);
final coordinator = TerminalSessionCoordinator(
controller: controller,
apiClient: apiClient,
session: session,
sessionFactory: sessionFactory.create,
onFrame: (_) {},
onRestore: (_) {},
viewportProvider: () => const TerminalViewport(columns: 80, rows: 24),
reconnectScheduler: reconnectScheduler.schedule,
);
await coordinator.start();
final firstSession = sessionFactory.createdSessions.single;
coordinator.sendInput('dir\r');
expect(firstSession.sentInputIds, ['abc-input-1']);
firstSession.disconnect();
await reconnectScheduler.runPending();
final secondSession = sessionFactory.createdSessions.last;
expect(secondSession.sentInputs, ['dir\r']);
expect(secondSession.sentInputIds, ['abc-input-1']);
secondSession.ackInput('abc-input-1');
secondSession.disconnect();
await reconnectScheduler.runPending();
final thirdSession = sessionFactory.createdSessions.last;
expect(thirdSession.sentInputs, isEmpty);
},
);
test('restore payloads are treated as authoritative over provisional text', () {
final decision = decideTerminalRestore(
currentText: 'PS> git status\r\nmodified: file.txt\r\nPS> ',
restoreText: 'PS> git status\r\n',
);
expect(decision, TerminalRestoreDecision.replaceWithRestore);
});
test( test(
'loadOlderHistory pages backward with beforeSequence instead of expanding lineCount', 'loadOlderHistory pages backward with beforeSequence instead of expanding lineCount',
() async { () async {
@ -920,12 +976,14 @@ class _FakeTerminalSocketSession extends TerminalSocketSession {
final bool autoConnect; final bool autoConnect;
final resizeCalls = <List<int>>[]; final resizeCalls = <List<int>>[];
final sentInputs = <String>[]; final sentInputs = <String>[];
final sentInputIds = <String>[];
int disposeCount = 0; int disposeCount = 0;
Completer<void>? disposeCompleter; Completer<void>? disposeCompleter;
Completer<void> _connectCompleter = Completer<void>(); Completer<void> _connectCompleter = Completer<void>();
void Function(String frame)? _onFrame; void Function(String frame)? _onFrame;
void Function(TerminalOutputPayload output)? _onOutput; void Function(TerminalOutputPayload output)? _onOutput;
void Function()? _onDisconnected; void Function()? _onDisconnected;
void Function(String inputId)? _onInputAck;
bool _isDisconnected = false; bool _isDisconnected = false;
void Function(TerminalRestorePayload restore)? _onRestore; void Function(TerminalRestorePayload restore)? _onRestore;
@ -933,10 +991,12 @@ class _FakeTerminalSocketSession extends TerminalSocketSession {
Future<void> connect({ Future<void> connect({
required void Function(TerminalOutputPayload output) onOutput, required void Function(TerminalOutputPayload output) onOutput,
required void Function(TerminalRestorePayload restore) onRestore, required void Function(TerminalRestorePayload restore) onRestore,
void Function(String inputId)? onInputAck,
void Function()? onDisconnected, void Function()? onDisconnected,
}) { }) {
_onOutput = onOutput; _onOutput = onOutput;
_onRestore = onRestore; _onRestore = onRestore;
_onInputAck = onInputAck;
_onDisconnected = onDisconnected; _onDisconnected = onDisconnected;
if (autoConnect && !_connectCompleter.isCompleted) { if (autoConnect && !_connectCompleter.isCompleted) {
_connectCompleter.complete(); _connectCompleter.complete();
@ -950,12 +1010,15 @@ class _FakeTerminalSocketSession extends TerminalSocketSession {
} }
@override @override
TerminalSocketDispatchResult sendInput(String input) { TerminalSocketDispatchResult sendInput(String input, {String? inputId}) {
if (_isDisconnected || !_connectCompleter.isCompleted) { if (_isDisconnected || !_connectCompleter.isCompleted) {
return TerminalSocketDispatchResult.noTransport; return TerminalSocketDispatchResult.noTransport;
} }
sentInputs.add(input); sentInputs.add(input);
if (inputId != null) {
sentInputIds.add(inputId);
}
return TerminalSocketDispatchResult.sent; return TerminalSocketDispatchResult.sent;
} }
@ -978,6 +1041,10 @@ class _FakeTerminalSocketSession extends TerminalSocketSession {
_onDisconnected?.call(); _onDisconnected?.call();
} }
void ackInput(String inputId) {
_onInputAck?.call(inputId);
}
@override @override
Future<void> dispose() async { Future<void> dispose() async {
disposeCount += 1; disposeCount += 1;

View File

@ -79,11 +79,11 @@ void main() {
await Future<void>.delayed(Duration.zero); await Future<void>.delayed(Duration.zero);
transport.emit('{"type":"attached","sessionId":"session-123"}'); transport.emit('{"type":"attached","sessionId":"session-123"}');
await connectFuture; await connectFuture;
session.sendInput('dir\r'); session.sendInput('dir\r', inputId: 'input-1');
expect( expect(
transport.sentMessages, transport.sentMessages,
contains('{"type":"input","input":"dir\\r"}'), contains('{"type":"input","input":"dir\\r","inputId":"input-1"}'),
); );
await session.dispose(); await session.dispose();
@ -197,6 +197,33 @@ void main() {
expect(outputs.single.chunk, 'live-output'); expect(outputs.single.chunk, 'live-output');
}); });
test('connect routes input acknowledgements to onInputAck', () async {
final transport = _FakeTerminalSocketTransport();
final session = TerminalSocketSession(
sessionId: 'session-123',
socketClient: AgentSocketClient(Uri.parse('https://host:9443')),
transportFactory: (_) => transport,
);
final acknowledgements = <String>[];
final connectFuture = session.connect(
onOutput: (_) {},
onRestore: (_) {},
onInputAck: acknowledgements.add,
);
await Future<void>.delayed(Duration.zero);
transport.emit('{"type":"attached","sessionId":"session-123"}');
await connectFuture;
transport.emit(
'{"type":"inputAck","sessionId":"session-123","inputId":"input-1"}',
);
await Future<void>.delayed(Duration.zero);
expect(acknowledgements, ['input-1']);
});
test('connect ignores unknown json control frames', () async { test('connect ignores unknown json control frames', () async {
final transport = _FakeTerminalSocketTransport(); final transport = _FakeTerminalSocketTransport();
final session = TerminalSocketSession( final session = TerminalSocketSession(

View File

@ -1326,7 +1326,7 @@ void main() {
); );
testWidgets( testWidgets(
'terminal reconnect keeps a richer local snapshot when restore is shorter', 'terminal reconnect replaces a provisional local snapshot with authoritative restore output',
(tester) async { (tester) async {
final snapshotStorage = _MemoryTerminalSnapshotStorage([ final snapshotStorage = _MemoryTerminalSnapshotStorage([
const TerminalSnapshot( const TerminalSnapshot(
@ -1360,7 +1360,8 @@ void main() {
final terminal = tester final terminal = tester
.widget<TerminalView>(find.byType(TerminalView)) .widget<TerminalView>(find.byType(TerminalView))
.terminal; .terminal;
expect(terminal.buffer.getText(), contains('modified: file.txt')); expect(terminal.buffer.getText(), isNot(contains('modified: file.txt')));
expect(terminal.buffer.getText(), contains('PS> git status'));
}, },
); );

View File

@ -1,10 +1,12 @@
using System.Text; using System.Text;
using System.Collections.Concurrent;
namespace TermRemoteCtl.Agent.History; namespace TermRemoteCtl.Agent.History;
public sealed class SessionHistoryStore public sealed class SessionHistoryStore
{ {
private static readonly UTF8Encoding Utf8WithoutBom = new(false); private static readonly UTF8Encoding Utf8WithoutBom = new(false);
private readonly ConcurrentDictionary<string, SemaphoreSlim> _writeGates = new();
private readonly string _historyRootPath; private readonly string _historyRootPath;
public SessionHistoryStore(string rootPath) public SessionHistoryStore(string rootPath)
@ -19,30 +21,47 @@ public sealed class SessionHistoryStore
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentNullException.ThrowIfNull(chunk); ArgumentNullException.ThrowIfNull(chunk);
var filePath = Path.Combine(_historyRootPath, $"{sessionId}.log"); var gate = _writeGates.GetOrAdd(sessionId, static _ => new SemaphoreSlim(1, 1));
await using var stream = new FileStream( await gate.WaitAsync(cancellationToken).ConfigureAwait(false);
filePath, try
FileMode.Append, {
FileAccess.Write, var filePath = Path.Combine(_historyRootPath, $"{sessionId}.log");
FileShare.Read, await using var stream = new FileStream(
4096, filePath,
FileOptions.Asynchronous); FileMode.Append,
await using var writer = new StreamWriter(stream, Utf8WithoutBom); FileAccess.Write,
await writer.WriteAsync(chunk.AsMemory(), cancellationToken); FileShare.Read,
await writer.FlushAsync(cancellationToken); 4096,
FileOptions.Asynchronous);
await using var writer = new StreamWriter(stream, Utf8WithoutBom);
await writer.WriteAsync(chunk.AsMemory(), cancellationToken).ConfigureAwait(false);
await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
gate.Release();
}
} }
public Task DeleteAsync(string sessionId, CancellationToken cancellationToken) public async Task DeleteAsync(string sessionId, CancellationToken cancellationToken)
{ {
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
var filePath = Path.Combine(_historyRootPath, $"{sessionId}.log"); var gate = _writeGates.GetOrAdd(sessionId, static _ => new SemaphoreSlim(1, 1));
if (File.Exists(filePath)) await gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{ {
File.Delete(filePath); var filePath = Path.Combine(_historyRootPath, $"{sessionId}.log");
if (File.Exists(filePath))
{
File.Delete(filePath);
}
}
finally
{
gate.Release();
_writeGates.TryRemove(sessionId, out _);
} }
return Task.CompletedTask;
} }
} }

View File

@ -1,5 +1,6 @@
using System.Text; using System.Text;
using System.Text.Json; using System.Text.Json;
using System.Collections.Concurrent;
namespace TermRemoteCtl.Agent.History; namespace TermRemoteCtl.Agent.History;
@ -11,6 +12,7 @@ public sealed class SessionIoJournalStore
}; };
private static readonly UTF8Encoding Utf8WithoutBom = new(false); private static readonly UTF8Encoding Utf8WithoutBom = new(false);
private readonly ConcurrentDictionary<string, SemaphoreSlim> _writeGates = new();
private readonly string _sessionRoot; private readonly string _sessionRoot;
public SessionIoJournalStore(string rootPath) public SessionIoJournalStore(string rootPath)
@ -25,9 +27,18 @@ public sealed class SessionIoJournalStore
{ {
ArgumentNullException.ThrowIfNull(ioEvent); ArgumentNullException.ThrowIfNull(ioEvent);
var filePath = Path.Combine(_sessionRoot, $"{ioEvent.SessionId}.io.jsonl"); var gate = _writeGates.GetOrAdd(ioEvent.SessionId, static _ => new SemaphoreSlim(1, 1));
var line = JsonSerializer.Serialize(ioEvent, SerializerOptions) + Environment.NewLine; await gate.WaitAsync(cancellationToken).ConfigureAwait(false);
await File.AppendAllTextAsync(filePath, line, Utf8WithoutBom, cancellationToken).ConfigureAwait(false); try
{
var filePath = Path.Combine(_sessionRoot, $"{ioEvent.SessionId}.io.jsonl");
var line = JsonSerializer.Serialize(ioEvent, SerializerOptions) + Environment.NewLine;
await File.AppendAllTextAsync(filePath, line, Utf8WithoutBom, cancellationToken).ConfigureAwait(false);
}
finally
{
gate.Release();
}
} }
public async Task<SessionJournalPage> ReadAsync( public async Task<SessionJournalPage> ReadAsync(
@ -89,18 +100,26 @@ public sealed class SessionIoJournalStore
CurrentSequence: currentSequence); CurrentSequence: currentSequence);
} }
public Task DeleteAsync(string sessionId, CancellationToken cancellationToken) public async Task DeleteAsync(string sessionId, CancellationToken cancellationToken)
{ {
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
var filePath = Path.Combine(_sessionRoot, $"{sessionId}.io.jsonl"); var gate = _writeGates.GetOrAdd(sessionId, static _ => new SemaphoreSlim(1, 1));
if (File.Exists(filePath)) await gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{ {
File.Delete(filePath); var filePath = Path.Combine(_sessionRoot, $"{sessionId}.io.jsonl");
if (File.Exists(filePath))
{
File.Delete(filePath);
}
}
finally
{
gate.Release();
_writeGates.TryRemove(sessionId, out _);
} }
return Task.CompletedTask;
} }
private async Task<List<SessionIoEvent>> ReadAllAsync(string sessionId, CancellationToken cancellationToken) private async Task<List<SessionIoEvent>> ReadAllAsync(string sessionId, CancellationToken cancellationToken)

View File

@ -2,6 +2,7 @@ using System.Net.WebSockets;
using System.Text; using System.Text;
using System.Text.Json; using System.Text.Json;
using TermRemoteCtl.Agent.Sessions; using TermRemoteCtl.Agent.Sessions;
using TermRemoteCtl.Agent.Terminal.Screen;
using TermRemoteCtl.Agent.Terminal; using TermRemoteCtl.Agent.Terminal;
namespace TermRemoteCtl.Agent.Realtime; namespace TermRemoteCtl.Agent.Realtime;
@ -82,11 +83,19 @@ public static class TerminalWebSocketHandler
restore.ScreenText, restore.ScreenText,
restore.PendingInput, restore.PendingInput,
restore.CursorRow, restore.CursorRow,
restore.CursorColumn), restore.CursorColumn,
MapScreenSnapshot(restore.ScreenSnapshot)),
sendGate, sendGate,
context.RequestAborted).ConfigureAwait(false); context.RequestAborted).ConfigureAwait(false);
host.OutputReceived += HandleOutput; host.OutputReceived += HandleOutput;
await ReceiveLoopAsync(context, socket, host, registry, diagnostics, sessionId).ConfigureAwait(false); await ReceiveLoopAsync(
context,
socket,
host,
registry,
diagnostics,
sessionId,
sendGate).ConfigureAwait(false);
} }
finally finally
{ {
@ -107,7 +116,8 @@ public static class TerminalWebSocketHandler
ISessionHost host, ISessionHost host,
SessionRegistry registry, SessionRegistry registry,
ITerminalDiagnosticsSink diagnostics, ITerminalDiagnosticsSink diagnostics,
string sessionId) string sessionId,
SemaphoreSlim sendGate)
{ {
var buffer = new byte[4096]; var buffer = new byte[4096];
@ -135,20 +145,24 @@ public static class TerminalWebSocketHandler
await HandleClientMessageAsync( await HandleClientMessageAsync(
Encoding.UTF8.GetString(message.ToArray()), Encoding.UTF8.GetString(message.ToArray()),
socket,
registry, registry,
host, host,
diagnostics, diagnostics,
sessionId, sessionId,
sendGate,
context.RequestAborted).ConfigureAwait(false); context.RequestAborted).ConfigureAwait(false);
} }
} }
private static async Task HandleClientMessageAsync( private static async Task HandleClientMessageAsync(
string payload, string payload,
WebSocket socket,
SessionRegistry registry, SessionRegistry registry,
ISessionHost host, ISessionHost host,
ITerminalDiagnosticsSink diagnostics, ITerminalDiagnosticsSink diagnostics,
string sessionId, string sessionId,
SemaphoreSlim sendGate,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
TerminalClientMessage? message; TerminalClientMessage? message;
@ -178,9 +192,47 @@ public static class TerminalWebSocketHandler
{ {
if (!string.IsNullOrEmpty(message.Input)) if (!string.IsNullOrEmpty(message.Input))
{ {
await registry.RecordInputAsync(sessionId, message.Input, cancellationToken).ConfigureAwait(false); if (!string.IsNullOrWhiteSpace(message.InputId))
diagnostics.Record("backend.input.received", sessionId, SanitizeDiagnosticText(message.Input)); {
await host.WriteInputAsync(sessionId, message.Input, cancellationToken).ConfigureAwait(false); if (!registry.TryBeginInputReceipt(sessionId, message.InputId, out var existingReceipt))
{
if (existingReceipt is not null && await existingReceipt.ConfigureAwait(false))
{
await SendJsonAsync(
socket,
new TerminalInputAckResponse(sessionId, message.InputId),
sendGate,
cancellationToken).ConfigureAwait(false);
}
return;
}
}
try
{
diagnostics.Record("backend.input.received", sessionId, SanitizeDiagnosticText(message.Input));
await host.WriteInputAsync(sessionId, message.Input, cancellationToken).ConfigureAwait(false);
await registry.RecordInputAsync(sessionId, message.Input, cancellationToken).ConfigureAwait(false);
if (!string.IsNullOrWhiteSpace(message.InputId))
{
registry.CompleteInputReceipt(sessionId, message.InputId, succeeded: true);
await SendJsonAsync(
socket,
new TerminalInputAckResponse(sessionId, message.InputId),
sendGate,
cancellationToken).ConfigureAwait(false);
}
}
catch
{
if (!string.IsNullOrWhiteSpace(message.InputId))
{
registry.CompleteInputReceipt(sessionId, message.InputId, succeeded: false);
}
throw;
}
} }
return; return;
@ -202,6 +254,36 @@ public static class TerminalWebSocketHandler
return input.Replace("\r", "\\r", StringComparison.Ordinal).Replace("\n", "\\n", StringComparison.Ordinal); return input.Replace("\r", "\\r", StringComparison.Ordinal).Replace("\n", "\\n", StringComparison.Ordinal);
} }
private static TerminalScreenSnapshotResponse? MapScreenSnapshot(
TerminalScreenSnapshot? snapshot)
{
if (snapshot is null)
{
return null;
}
return new TerminalScreenSnapshotResponse(
snapshot.ScreenVersion,
snapshot.SourceSequence,
snapshot.Rows,
snapshot.Columns,
snapshot.CursorRow,
snapshot.CursorColumn,
snapshot.CursorVisible,
snapshot.ActiveBuffer,
MapScreenBuffer(snapshot.PrimaryBuffer),
snapshot.AlternateBuffer is null ? null : MapScreenBuffer(snapshot.AlternateBuffer));
}
private static TerminalScreenBufferSnapshotResponse MapScreenBuffer(
TerminalScreenBufferSnapshot buffer)
{
return new TerminalScreenBufferSnapshotResponse(
buffer.Viewport
.Select(static line => new TerminalScreenLineSnapshotResponse(line.Index, line.Text))
.ToArray());
}
private static async Task SendJsonAsync( private static async Task SendJsonAsync(
WebSocket socket, WebSocket socket,
object response, object response,
@ -244,6 +326,7 @@ public static class TerminalWebSocketHandler
string PendingInput, string PendingInput,
int? CursorRow, int? CursorRow,
int? CursorColumn, int? CursorColumn,
TerminalScreenSnapshotResponse? ScreenSnapshot,
string Type = "restore"); string Type = "restore");
private sealed record TerminalOutputResponse( private sealed record TerminalOutputResponse(
@ -252,10 +335,35 @@ public static class TerminalWebSocketHandler
string Chunk, string Chunk,
string Type = "output"); string Type = "output");
private sealed record TerminalInputAckResponse(
string SessionId,
string InputId,
string Type = "inputAck");
private sealed record TerminalScreenSnapshotResponse(
long ScreenVersion,
long SourceSequence,
int Rows,
int Columns,
int CursorRow,
int CursorColumn,
bool CursorVisible,
string ActiveBuffer,
TerminalScreenBufferSnapshotResponse PrimaryBuffer,
TerminalScreenBufferSnapshotResponse? AlternateBuffer);
private sealed record TerminalScreenBufferSnapshotResponse(
IReadOnlyList<TerminalScreenLineSnapshotResponse> Viewport);
private sealed record TerminalScreenLineSnapshotResponse(
int Index,
string Text);
private sealed record TerminalClientMessage( private sealed record TerminalClientMessage(
string Type, string Type,
string? SessionId, string? SessionId,
string? Input, string? Input,
string? InputId,
int? Columns, int? Columns,
int? Rows); int? Rows);
} }

View File

@ -15,6 +15,7 @@ public sealed class SessionRegistry
private readonly ConcurrentDictionary<string, TerminalRingBuffer> _historyBySession = new(); private readonly ConcurrentDictionary<string, TerminalRingBuffer> _historyBySession = new();
private readonly ConcurrentDictionary<string, TerminalReplayBuffer> _replayBySession = new(); private readonly ConcurrentDictionary<string, TerminalReplayBuffer> _replayBySession = new();
private readonly ConcurrentDictionary<string, PendingInputEchoTracker> _pendingInputEchoBySession = new(); private readonly ConcurrentDictionary<string, PendingInputEchoTracker> _pendingInputEchoBySession = new();
private readonly ConcurrentDictionary<string, TerminalInputReceiptTracker> _inputReceiptsBySession = new();
private readonly ConcurrentDictionary<string, TerminalScreenEngine> _screenBySession = new(); private readonly ConcurrentDictionary<string, TerminalScreenEngine> _screenBySession = new();
private readonly ConcurrentDictionary<string, long> _sequenceBySession = new(); private readonly ConcurrentDictionary<string, long> _sequenceBySession = new();
private readonly SessionHistoryStore _historyStore; private readonly SessionHistoryStore _historyStore;
@ -54,6 +55,7 @@ public sealed class SessionRegistry
_historyBySession[record.SessionId] = new TerminalRingBuffer(_ringBufferLineLimit); _historyBySession[record.SessionId] = new TerminalRingBuffer(_ringBufferLineLimit);
_replayBySession[record.SessionId] = new TerminalReplayBuffer(ReplayCharacterLimit); _replayBySession[record.SessionId] = new TerminalReplayBuffer(ReplayCharacterLimit);
_pendingInputEchoBySession[record.SessionId] = new PendingInputEchoTracker(); _pendingInputEchoBySession[record.SessionId] = new PendingInputEchoTracker();
_inputReceiptsBySession[record.SessionId] = new TerminalInputReceiptTracker();
if (_enableBackendScreenProtocol) if (_enableBackendScreenProtocol)
{ {
_screenBySession[record.SessionId] = new TerminalScreenEngine(); _screenBySession[record.SessionId] = new TerminalScreenEngine();
@ -302,6 +304,9 @@ public sealed class SessionRegistry
sessionId, sessionId,
_ => new PendingInputEchoTracker()); _ => new PendingInputEchoTracker());
var sequence = GetCurrentSequence(sessionId); var sequence = GetCurrentSequence(sessionId);
var screenSnapshot = _enableBackendScreenProtocol
? _screenBySession.GetOrAdd(sessionId, _ => new TerminalScreenEngine()).CreateSnapshot(sessionId)
: null;
return new SessionRestoreSnapshot( return new SessionRestoreSnapshot(
sessionId, sessionId,
@ -309,7 +314,38 @@ public sealed class SessionRegistry
replay.GetSnapshot(), replay.GetSnapshot(),
pendingInputEcho.GetVisibleSuffix(), pendingInputEcho.GetVisibleSuffix(),
null, null,
null); null,
screenSnapshot);
}
public bool TryBeginInputReceipt(
string sessionId,
string inputId,
out Task<bool>? existingReceipt)
{
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentException.ThrowIfNullOrWhiteSpace(inputId);
if (!_records.ContainsKey(sessionId))
{
throw new KeyNotFoundException($"Session '{sessionId}' was not found.");
}
var tracker = _inputReceiptsBySession.GetOrAdd(
sessionId,
_ => new TerminalInputReceiptTracker());
return tracker.TryBegin(inputId, out existingReceipt);
}
public void CompleteInputReceipt(string sessionId, string inputId, bool succeeded)
{
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentException.ThrowIfNullOrWhiteSpace(inputId);
var tracker = _inputReceiptsBySession.GetOrAdd(
sessionId,
_ => new TerminalInputReceiptTracker());
tracker.Complete(inputId, succeeded);
} }
public TerminalScreenSnapshot GetScreenSnapshot(string sessionId) public TerminalScreenSnapshot GetScreenSnapshot(string sessionId)
@ -354,6 +390,7 @@ public sealed class SessionRegistry
_historyBySession.TryRemove(sessionId, out _); _historyBySession.TryRemove(sessionId, out _);
_replayBySession.TryRemove(sessionId, out _); _replayBySession.TryRemove(sessionId, out _);
_pendingInputEchoBySession.TryRemove(sessionId, out _); _pendingInputEchoBySession.TryRemove(sessionId, out _);
_inputReceiptsBySession.TryRemove(sessionId, out _);
_screenBySession.TryRemove(sessionId, out _); _screenBySession.TryRemove(sessionId, out _);
_sequenceBySession.TryRemove(sessionId, out _); _sequenceBySession.TryRemove(sessionId, out _);
await _historyStore.DeleteAsync(sessionId, cancellationToken).ConfigureAwait(false); await _historyStore.DeleteAsync(sessionId, cancellationToken).ConfigureAwait(false);

View File

@ -1,3 +1,5 @@
using TermRemoteCtl.Agent.Terminal.Screen;
namespace TermRemoteCtl.Agent.Sessions; namespace TermRemoteCtl.Agent.Sessions;
public sealed record SessionRestoreSnapshot( public sealed record SessionRestoreSnapshot(
@ -6,4 +8,5 @@ public sealed record SessionRestoreSnapshot(
string ScreenText, string ScreenText,
string PendingInput, string PendingInput,
int? CursorRow, int? CursorRow,
int? CursorColumn); int? CursorColumn,
TerminalScreenSnapshot? ScreenSnapshot);

View File

@ -0,0 +1,45 @@
using System.Collections.Concurrent;
namespace TermRemoteCtl.Agent.Sessions;
internal sealed class TerminalInputReceiptTracker
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> _receipts = new();
public bool TryBegin(string inputId, out Task<bool>? existingReceipt)
{
ArgumentException.ThrowIfNullOrWhiteSpace(inputId);
var completionSource = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously);
if (_receipts.TryAdd(inputId, completionSource))
{
existingReceipt = null;
return true;
}
existingReceipt = _receipts[inputId].Task;
return false;
}
public void Complete(string inputId, bool succeeded)
{
ArgumentException.ThrowIfNullOrWhiteSpace(inputId);
if (!_receipts.TryGetValue(inputId, out var completionSource))
{
return;
}
if (succeeded)
{
completionSource.TrySetResult(true);
return;
}
if (_receipts.TryRemove(inputId, out var removed))
{
removed.TrySetResult(false);
}
}
}

View File

@ -11,14 +11,8 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable
private readonly IConPtySessionFactory _sessionFactory; private readonly IConPtySessionFactory _sessionFactory;
private readonly SessionRegistry _sessionRegistry; private readonly SessionRegistry _sessionRegistry;
private readonly ConcurrentDictionary<string, IConPtySession> _sessions = new(StringComparer.Ordinal); private readonly ConcurrentDictionary<string, IConPtySession> _sessions = new(StringComparer.Ordinal);
private readonly Channel<(string SessionId, string Chunk)> _outputChannel = Channel.CreateUnbounded<(string SessionId, string Chunk)>( private readonly ConcurrentDictionary<string, SessionOutputProcessor> _outputProcessors = new(StringComparer.Ordinal);
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false,
});
private readonly CancellationTokenSource _disposeCancellation = new(); private readonly CancellationTokenSource _disposeCancellation = new();
private readonly Task _outputPump;
public PowerShellSessionHost( public PowerShellSessionHost(
IConPtySessionFactory sessionFactory, IConPtySessionFactory sessionFactory,
@ -26,7 +20,6 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable
{ {
_sessionFactory = sessionFactory; _sessionFactory = sessionFactory;
_sessionRegistry = sessionRegistry; _sessionRegistry = sessionRegistry;
_outputPump = Task.Run(() => PumpOutputAsync(_disposeCancellation.Token));
} }
public event EventHandler<TerminalOutputEventArgs>? OutputReceived; public event EventHandler<TerminalOutputEventArgs>? OutputReceived;
@ -62,6 +55,7 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable
return; return;
} }
_outputProcessors.GetOrAdd(sessionId, CreateOutputProcessor);
session.OutputReceived += HandleSessionOutput; session.OutputReceived += HandleSessionOutput;
try try
@ -114,11 +108,11 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable
session.OutputReceived -= HandleSessionOutput; session.OutputReceived -= HandleSessionOutput;
await session.DisposeAsync().ConfigureAwait(false); await session.DisposeAsync().ConfigureAwait(false);
await DisposeOutputProcessorAsync(sessionId).ConfigureAwait(false);
} }
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
_outputChannel.Writer.TryComplete();
_disposeCancellation.Cancel(); _disposeCancellation.Cancel();
foreach (var session in _sessions.Values) foreach (var session in _sessions.Values)
@ -128,19 +122,21 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable
} }
_sessions.Clear(); _sessions.Clear();
try
{ foreach (var sessionId in _outputProcessors.Keys)
await _outputPump.ConfigureAwait(false);
}
catch (OperationCanceledException)
{ {
await DisposeOutputProcessorAsync(sessionId).ConfigureAwait(false);
} }
_disposeCancellation.Dispose(); _disposeCancellation.Dispose();
} }
private void HandleSessionOutput(object? sender, TerminalOutputEventArgs args) private void HandleSessionOutput(object? sender, TerminalOutputEventArgs args)
{ {
_outputChannel.Writer.TryWrite((args.SessionId, args.Chunk)); if (_outputProcessors.TryGetValue(args.SessionId, out var processor))
{
processor.Channel.Writer.TryWrite(args.Chunk);
}
} }
private async Task PublishOutputAsync(string sessionId, string chunk) private async Task PublishOutputAsync(string sessionId, string chunk)
@ -152,11 +148,60 @@ internal sealed class PowerShellSessionHost : ISessionHost, IAsyncDisposable
OutputReceived?.Invoke(this, new TerminalOutputEventArgs(sessionId, chunk, ioEvent.Sequence)); OutputReceived?.Invoke(this, new TerminalOutputEventArgs(sessionId, chunk, ioEvent.Sequence));
} }
private async Task PumpOutputAsync(CancellationToken cancellationToken) private SessionOutputProcessor CreateOutputProcessor(string sessionId)
{ {
await foreach (var output in _outputChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) var channel = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false,
});
var pumpTask = Task.Run(
() => PumpSessionOutputAsync(sessionId, channel.Reader, _disposeCancellation.Token),
CancellationToken.None);
return new SessionOutputProcessor(channel, pumpTask);
}
private async Task DisposeOutputProcessorAsync(string sessionId)
{
if (!_outputProcessors.TryRemove(sessionId, out var processor))
{
return;
}
processor.Channel.Writer.TryComplete();
try
{
await processor.PumpTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{ {
await PublishOutputAsync(output.SessionId, output.Chunk).ConfigureAwait(false);
} }
} }
private async Task PumpSessionOutputAsync(
string sessionId,
ChannelReader<string> reader,
CancellationToken cancellationToken)
{
await foreach (var chunk in reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
try
{
await PublishOutputAsync(sessionId, chunk).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
return;
}
catch
{
// Keep one broken session from poisoning every other session's output path.
}
}
}
private sealed record SessionOutputProcessor(
Channel<string> Channel,
Task PumpTask);
} }

View File

@ -7,6 +7,8 @@ using Microsoft.AspNetCore.Mvc.Testing;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using TermRemoteCtl.Agent.Configuration;
using TermRemoteCtl.Agent.Sessions; using TermRemoteCtl.Agent.Sessions;
using TermRemoteCtl.Agent.Terminal; using TermRemoteCtl.Agent.Terminal;
@ -169,6 +171,7 @@ public sealed class TerminalWebSocketHandlerTests
CancellationToken.None)) CancellationToken.None))
{ {
_ = await ReceiveTextAsync(socket, CancellationToken.None); _ = await ReceiveTextAsync(socket, CancellationToken.None);
_ = await ReceiveTextAsync(socket, CancellationToken.None);
var inputMessage = JsonSerializer.Serialize(new { type = "input", input = "dir" }); var inputMessage = JsonSerializer.Serialize(new { type = "input", input = "dir" });
await socket.SendAsync(Encoding.UTF8.GetBytes(inputMessage), WebSocketMessageType.Text, true, CancellationToken.None); await socket.SendAsync(Encoding.UTF8.GetBytes(inputMessage), WebSocketMessageType.Text, true, CancellationToken.None);
@ -215,6 +218,70 @@ public sealed class TerminalWebSocketHandlerTests
Assert.Contains("\"sequence\":2", restoreFrame); Assert.Contains("\"sequence\":2", restoreFrame);
} }
[Fact]
[Trait("Track", "Mainline")]
public async Task Input_Acknowledgement_Deduplicates_Replayed_Client_Input()
{
await using var fixture = new TerminalApiFixture();
var registry = fixture.Services.GetRequiredService<SessionRegistry>();
fixture.TerminalHost.Registry = registry;
var session = registry.Create("Shell", DateTimeOffset.UtcNow);
using WebSocket firstSocket = await fixture.Server.CreateWebSocketClient().ConnectAsync(
new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"),
CancellationToken.None);
_ = await ReceiveTextAsync(firstSocket, CancellationToken.None);
_ = await ReceiveTextAsync(firstSocket, CancellationToken.None);
var inputMessage = JsonSerializer.Serialize(new { type = "input", input = "dir", inputId = "input-1" });
await firstSocket.SendAsync(Encoding.UTF8.GetBytes(inputMessage), WebSocketMessageType.Text, true, CancellationToken.None);
var firstAckFrame = await ReceiveTextAsync(firstSocket, CancellationToken.None);
Assert.Contains("\"type\":\"inputAck\"", firstAckFrame);
Assert.Contains("\"inputId\":\"input-1\"", firstAckFrame);
using WebSocket secondSocket = await fixture.Server.CreateWebSocketClient().ConnectAsync(
new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"),
CancellationToken.None);
_ = await ReceiveTextAsync(secondSocket, CancellationToken.None);
_ = await ReceiveTextAsync(secondSocket, CancellationToken.None);
await secondSocket.SendAsync(Encoding.UTF8.GetBytes(inputMessage), WebSocketMessageType.Text, true, CancellationToken.None);
var duplicateAckFrame = await ReceiveTextAsync(secondSocket, CancellationToken.None);
Assert.Contains("\"type\":\"inputAck\"", duplicateAckFrame);
Assert.Contains("\"inputId\":\"input-1\"", duplicateAckFrame);
Assert.Single(fixture.TerminalHost.Inputs.Where(item => item == ("input", session.SessionId, "dir")));
}
[Fact]
[Trait("Track", "Mainline")]
public async Task Attach_Includes_Screen_Snapshot_When_Backend_Screen_Protocol_Is_Enabled()
{
await using var fixture = new TerminalApiFixture(enableBackendScreenProtocol: true);
var registry = fixture.Services.GetRequiredService<SessionRegistry>();
fixture.TerminalHost.Registry = registry;
var session = registry.Create("Shell", DateTimeOffset.UtcNow);
await registry.RecordResizeAsync(session.SessionId, 40, 10, CancellationToken.None);
await registry.RecordOutputAsync(session.SessionId, "\u001b[2J\u001b[Hprompt> dir", CancellationToken.None);
using WebSocket socket = await fixture.Server.CreateWebSocketClient().ConnectAsync(
new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"),
CancellationToken.None);
_ = await ReceiveTextAsync(socket, CancellationToken.None);
var restoreFrame = await ReceiveTextAsync(socket, CancellationToken.None);
var restorePayload = JsonSerializer.Deserialize<TerminalRestoreResponse>(
restoreFrame,
new JsonSerializerOptions(JsonSerializerDefaults.Web));
Assert.NotNull(restorePayload);
Assert.NotNull(restorePayload!.ScreenSnapshot);
Assert.Equal("primary", restorePayload.ScreenSnapshot!.ActiveBuffer);
Assert.Equal(2L, restorePayload.ScreenSnapshot.SourceSequence);
}
private static async Task<string> ReceiveTextAsync(WebSocket socket, CancellationToken cancellationToken) private static async Task<string> ReceiveTextAsync(WebSocket socket, CancellationToken cancellationToken)
{ {
var buffer = new byte[4096]; var buffer = new byte[4096];
@ -264,10 +331,12 @@ public sealed class TerminalWebSocketHandlerTests
private readonly string _dataRoot = Path.Combine(Path.GetTempPath(), "TermRemoteCtl.Tests", Guid.NewGuid().ToString("N")); private readonly string _dataRoot = Path.Combine(Path.GetTempPath(), "TermRemoteCtl.Tests", Guid.NewGuid().ToString("N"));
private readonly TestTerminalSessionHost _terminalHost; private readonly TestTerminalSessionHost _terminalHost;
private readonly RecordingTerminalDiagnosticsSink _diagnostics = new(); private readonly RecordingTerminalDiagnosticsSink _diagnostics = new();
private readonly bool _enableBackendScreenProtocol;
public TerminalApiFixture() public TerminalApiFixture(bool enableBackendScreenProtocol = false)
{ {
_terminalHost = new TestTerminalSessionHost(); _terminalHost = new TestTerminalSessionHost();
_enableBackendScreenProtocol = enableBackendScreenProtocol;
} }
public TestTerminalSessionHost TerminalHost => _terminalHost; public TestTerminalSessionHost TerminalHost => _terminalHost;
@ -285,11 +354,21 @@ public sealed class TerminalWebSocketHandlerTests
["Agent:BindAddress"] = "127.0.0.1", ["Agent:BindAddress"] = "127.0.0.1",
["Agent:HttpsPort"] = "9443", ["Agent:HttpsPort"] = "9443",
["Agent:WebSocketFrameFlushMilliseconds"] = "33", ["Agent:WebSocketFrameFlushMilliseconds"] = "33",
["Agent:RingBufferLineLimit"] = "4000" ["Agent:RingBufferLineLimit"] = "4000",
["Agent:EnableBackendScreenProtocol"] = _enableBackendScreenProtocol.ToString()
}); });
}); });
builder.ConfigureServices(services => builder.ConfigureServices(services =>
{ {
services.PostConfigure<AgentOptions>(options =>
{
options.DataRoot = _dataRoot;
options.BindAddress = "127.0.0.1";
options.HttpsPort = 9443;
options.WebSocketFrameFlushMilliseconds = 33;
options.RingBufferLineLimit = 4000;
options.EnableBackendScreenProtocol = _enableBackendScreenProtocol;
});
services.RemoveAll<ISessionHost>(); services.RemoveAll<ISessionHost>();
services.AddSingleton(_terminalHost); services.AddSingleton(_terminalHost);
services.AddSingleton<ISessionHost>(sp => sp.GetRequiredService<TestTerminalSessionHost>()); services.AddSingleton<ISessionHost>(sp => sp.GetRequiredService<TestTerminalSessionHost>());
@ -300,9 +379,26 @@ public sealed class TerminalWebSocketHandlerTests
public new async ValueTask DisposeAsync() public new async ValueTask DisposeAsync()
{ {
await base.DisposeAsync(); await base.DisposeAsync();
if (Directory.Exists(_dataRoot)) if (!Directory.Exists(_dataRoot))
{ {
Directory.Delete(_dataRoot, true); return;
}
for (var attempt = 0; attempt < 5; attempt += 1)
{
try
{
Directory.Delete(_dataRoot, true);
return;
}
catch (IOException) when (attempt < 4)
{
await Task.Delay(50);
}
catch (UnauthorizedAccessException) when (attempt < 4)
{
await Task.Delay(50);
}
} }
} }
} }
@ -405,8 +501,14 @@ public sealed class TerminalWebSocketHandlerTests
string PendingInput, string PendingInput,
int? CursorRow, int? CursorRow,
int? CursorColumn, int? CursorColumn,
TerminalScreenSnapshotResponse? ScreenSnapshot,
string Type); string Type);
private sealed record TerminalScreenSnapshotResponse(
long ScreenVersion,
long SourceSequence,
string ActiveBuffer);
private sealed record TerminalOutputResponse( private sealed record TerminalOutputResponse(
string SessionId, string SessionId,
long Sequence, long Sequence,

View File

@ -110,6 +110,53 @@ public class PowerShellSessionHostTests
Assert.True(factory.CreatedSessions.Last().StartCount > 0); Assert.True(factory.CreatedSessions.Last().StartCount > 0);
} }
[Fact]
public async Task Output_Failure_From_One_Session_Does_Not_Block_Other_Sessions()
{
var factory = new FakeConPtySessionFactory();
using var harness = HostHarness.Create(factory);
await using var host = harness.Host;
var blockedSession = harness.Registry.Create("blocked", DateTimeOffset.UtcNow);
var healthySession = harness.Registry.Create("healthy", DateTimeOffset.UtcNow);
await host.StartAsync(blockedSession.SessionId, CancellationToken.None);
await host.StartAsync(healthySession.SessionId, CancellationToken.None);
var blockedLogPath = Path.Combine(
harness.DataRoot,
"sessions",
$"{blockedSession.SessionId}.log");
Directory.CreateDirectory(Path.GetDirectoryName(blockedLogPath)!);
await using var blockingStream = new FileStream(
blockedLogPath,
FileMode.OpenOrCreate,
FileAccess.ReadWrite,
FileShare.None);
using var received = new ManualResetEventSlim(false);
var outputs = new List<TerminalOutputEventArgs>();
host.OutputReceived += (_, args) =>
{
lock (outputs)
{
outputs.Add(args);
if (args.SessionId == healthySession.SessionId)
{
received.Set();
}
}
};
factory.CreatedSessions[0].EmitOutput(blockedSession.SessionId, "blocked-output");
factory.CreatedSessions[1].EmitOutput(healthySession.SessionId, "healthy-output");
Assert.True(received.Wait(TimeSpan.FromSeconds(2)));
Assert.Contains(
outputs,
item => item.SessionId == healthySession.SessionId &&
item.Chunk == "healthy-output");
}
private sealed class HostHarness : IDisposable private sealed class HostHarness : IDisposable
{ {
private HostHarness(string dataRoot, SessionRegistry registry, PowerShellSessionHost host) private HostHarness(string dataRoot, SessionRegistry registry, PowerShellSessionHost host)
@ -148,7 +195,22 @@ public class PowerShellSessionHostTests
{ {
if (Directory.Exists(DataRoot)) if (Directory.Exists(DataRoot))
{ {
Directory.Delete(DataRoot, true); for (var attempt = 0; attempt < 5; attempt += 1)
{
try
{
Directory.Delete(DataRoot, true);
break;
}
catch (IOException) when (attempt < 4)
{
Thread.Sleep(50);
}
catch (UnauthorizedAccessException) when (attempt < 4)
{
Thread.Sleep(50);
}
}
} }
} }
} }