TermRemoteCtl/apps/windows_agent/tests/TermRemoteCtl.Agent.IntegrationTests/Realtime/TerminalWebSocketHandlerTests.cs

317 lines
12 KiB
C#

using System.Net;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc.Testing;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using TermRemoteCtl.Agent.Sessions;
using TermRemoteCtl.Agent.Terminal;
namespace TermRemoteCtl.Agent.IntegrationTests.Realtime;
public sealed class TerminalWebSocketHandlerTests
{
[Fact]
public async Task Attach_Streams_Output_And_Forwards_Input()
{
await using var fixture = new TerminalApiFixture();
var registry = fixture.Services.GetRequiredService<SessionRegistry>();
var session = registry.Create("Shell", DateTimeOffset.UtcNow);
using WebSocket socket = await fixture.Server.CreateWebSocketClient().ConnectAsync(
new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"),
CancellationToken.None);
var attachedFrame = await ReceiveTextAsync(socket, CancellationToken.None);
var attachedPayload = JsonSerializer.Deserialize<TerminalAttachResponse>(
attachedFrame,
new JsonSerializerOptions(JsonSerializerDefaults.Web));
Assert.NotNull(attachedPayload);
Assert.Equal("attached", attachedPayload!.Type);
Assert.Equal(session.SessionId, attachedPayload.SessionId);
fixture.TerminalHost.EmitOutput(session.SessionId, "abc");
fixture.TerminalHost.EmitOutput(session.SessionId, "def");
var outputFrame = await ReceiveTextAsync(socket, CancellationToken.None);
Assert.Equal("abcdef", outputFrame);
var inputMessage = JsonSerializer.Serialize(new { type = "input", input = "dir" });
await socket.SendAsync(Encoding.UTF8.GetBytes(inputMessage), WebSocketMessageType.Text, true, CancellationToken.None);
await WaitForConditionAsync(() => fixture.TerminalHost.Inputs.Contains(("input", session.SessionId, "dir")), TimeSpan.FromSeconds(2));
await WaitForConditionAsync(
() => fixture.Diagnostics.Events.Contains(("backend.input.received", session.SessionId, "dir")),
TimeSpan.FromSeconds(2));
}
[Fact]
public async Task Attach_Replays_Recent_Output_For_Existing_Session()
{
await using var fixture = new TerminalApiFixture();
var registry = fixture.Services.GetRequiredService<SessionRegistry>();
var session = registry.Create("Shell", DateTimeOffset.UtcNow);
await registry.AppendOutputAsync(session.SessionId, "prompt> dir\r\nnext> ", CancellationToken.None);
using WebSocket socket = await fixture.Server.CreateWebSocketClient().ConnectAsync(
new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"),
CancellationToken.None);
var attachedFrame = await ReceiveTextAsync(socket, CancellationToken.None);
var attachedPayload = JsonSerializer.Deserialize<TerminalAttachResponse>(
attachedFrame,
new JsonSerializerOptions(JsonSerializerDefaults.Web));
Assert.NotNull(attachedPayload);
Assert.Equal("attached", attachedPayload!.Type);
var replayFrame = await ReceiveTextAsync(socket, CancellationToken.None);
Assert.Equal("prompt> dir\r\nnext> ", replayFrame);
}
[Fact]
public async Task Attach_Does_Not_Duplicate_Output_Produced_During_Replay_Boundary()
{
await using var fixture = new TerminalApiFixture();
var registry = fixture.Services.GetRequiredService<SessionRegistry>();
fixture.TerminalHost.Registry = registry;
var session = registry.Create("Shell", DateTimeOffset.UtcNow);
await registry.AppendOutputAsync(session.SessionId, "prompt> dir\r\n", CancellationToken.None);
fixture.TerminalHost.EmitOutputOnNextSubscription(session.SessionId, "next> ");
using WebSocket socket = await fixture.Server.CreateWebSocketClient().ConnectAsync(
new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"),
CancellationToken.None);
var attachedFrame = await ReceiveTextAsync(socket, CancellationToken.None);
var attachedPayload = JsonSerializer.Deserialize<TerminalAttachResponse>(
attachedFrame,
new JsonSerializerOptions(JsonSerializerDefaults.Web));
Assert.NotNull(attachedPayload);
Assert.Equal("attached", attachedPayload!.Type);
var replayFrame = await ReceiveTextAsync(socket, CancellationToken.None);
Assert.Equal("prompt> dir\r\n", replayFrame);
var liveFrame = await ReceiveTextAsync(socket, CancellationToken.None);
Assert.Equal("next> ", liveFrame);
await AssertNoAdditionalTextFrameAsync(socket, TimeSpan.FromMilliseconds(200));
}
[Fact]
public async Task Reattach_Replays_Visible_User_Input_When_No_Output_Echo_Has_Arrived_Yet()
{
await using var fixture = new TerminalApiFixture();
var registry = fixture.Services.GetRequiredService<SessionRegistry>();
var session = registry.Create("Shell", DateTimeOffset.UtcNow);
using (WebSocket socket = await fixture.Server.CreateWebSocketClient().ConnectAsync(
new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"),
CancellationToken.None))
{
_ = await ReceiveTextAsync(socket, CancellationToken.None);
var inputMessage = JsonSerializer.Serialize(new { type = "input", input = "dir" });
await socket.SendAsync(Encoding.UTF8.GetBytes(inputMessage), WebSocketMessageType.Text, true, CancellationToken.None);
await WaitForConditionAsync(() => fixture.TerminalHost.Inputs.Contains(("input", session.SessionId, "dir")), TimeSpan.FromSeconds(2));
}
using WebSocket replaySocket = await fixture.Server.CreateWebSocketClient().ConnectAsync(
new Uri($"ws://localhost/ws/terminal?sessionId={session.SessionId}"),
CancellationToken.None);
_ = await ReceiveTextAsync(replaySocket, CancellationToken.None);
var replayFrame = await ReceiveTextAsync(replaySocket, CancellationToken.None);
Assert.Equal("dir", replayFrame);
}
private static async Task<string> ReceiveTextAsync(WebSocket socket, CancellationToken cancellationToken)
{
var buffer = new byte[4096];
using var stream = new MemoryStream();
while (true)
{
var result = await socket.ReceiveAsync(buffer, cancellationToken);
if (result.MessageType == WebSocketMessageType.Close)
{
throw new InvalidOperationException("Socket closed before a text message was received.");
}
stream.Write(buffer, 0, result.Count);
if (result.EndOfMessage)
{
return Encoding.UTF8.GetString(stream.ToArray());
}
}
}
private static async Task WaitForConditionAsync(Func<bool> condition, TimeSpan timeout)
{
var deadline = DateTimeOffset.UtcNow.Add(timeout);
while (DateTimeOffset.UtcNow < deadline)
{
if (condition())
{
return;
}
await Task.Delay(25);
}
throw new TimeoutException("Condition was not met.");
}
private static async Task AssertNoAdditionalTextFrameAsync(WebSocket socket, TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
await Assert.ThrowsAnyAsync<OperationCanceledException>(
async () => await ReceiveTextAsync(socket, cts.Token));
}
private sealed class TerminalApiFixture : WebApplicationFactory<Program>
{
private readonly string _dataRoot = Path.Combine(Path.GetTempPath(), "TermRemoteCtl.Tests", Guid.NewGuid().ToString("N"));
private readonly TestTerminalSessionHost _terminalHost;
private readonly RecordingTerminalDiagnosticsSink _diagnostics = new();
public TerminalApiFixture()
{
_terminalHost = new TestTerminalSessionHost();
}
public TestTerminalSessionHost TerminalHost => _terminalHost;
public RecordingTerminalDiagnosticsSink Diagnostics => _diagnostics;
protected override void ConfigureWebHost(IWebHostBuilder builder)
{
builder.UseEnvironment("Development");
builder.ConfigureAppConfiguration((_, configBuilder) =>
{
configBuilder.AddInMemoryCollection(new Dictionary<string, string?>
{
["Agent:DataRoot"] = _dataRoot,
["Agent:BindAddress"] = "127.0.0.1",
["Agent:HttpsPort"] = "9443",
["Agent:WebSocketFrameFlushMilliseconds"] = "33",
["Agent:RingBufferLineLimit"] = "4000"
});
});
builder.ConfigureServices(services =>
{
services.RemoveAll<ISessionHost>();
services.AddSingleton(_terminalHost);
services.AddSingleton<ISessionHost>(sp => sp.GetRequiredService<TestTerminalSessionHost>());
services.AddSingleton<ITerminalDiagnosticsSink>(_diagnostics);
});
}
public new async ValueTask DisposeAsync()
{
await base.DisposeAsync();
if (Directory.Exists(_dataRoot))
{
Directory.Delete(_dataRoot, true);
}
}
}
private sealed class RecordingTerminalDiagnosticsSink : ITerminalDiagnosticsSink
{
private readonly List<(string EventName, string SessionId, string Detail)> _events = [];
public IReadOnlyList<(string EventName, string SessionId, string Detail)> Events => _events;
public void Record(string eventName, string sessionId, string detail)
{
_events.Add((eventName, sessionId, detail));
}
}
private sealed class TestTerminalSessionHost : ISessionHost
{
private readonly List<(string Kind, string SessionId, string Value)> _inputs = new();
private EventHandler<TerminalOutputEventArgs>? _outputReceived;
private readonly object _gate = new();
private (string SessionId, string Chunk)? _pendingSubscriptionEmission;
public event EventHandler<TerminalOutputEventArgs>? OutputReceived
{
add
{
(string SessionId, string Chunk)? emission;
lock (_gate)
{
_outputReceived += value;
emission = _pendingSubscriptionEmission;
_pendingSubscriptionEmission = null;
}
if (emission is { } pending)
{
EmitOutput(pending.SessionId, pending.Chunk);
}
}
remove
{
lock (_gate)
{
_outputReceived -= value;
}
}
}
public SessionRegistry? Registry { get; set; }
public IReadOnlyList<(string Kind, string SessionId, string Value)> Inputs => _inputs;
public Task StartAsync(string sessionId, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StopAsync(string sessionId, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task WriteInputAsync(string sessionId, string input, CancellationToken cancellationToken)
{
_inputs.Add(("input", sessionId, input));
return Task.CompletedTask;
}
public Task ResizeAsync(string sessionId, int columns, int rows, CancellationToken cancellationToken)
{
_inputs.Add(("resize", sessionId, $"{columns}x{rows}"));
return Task.CompletedTask;
}
public void EmitOutputOnNextSubscription(string sessionId, string chunk)
{
lock (_gate)
{
_pendingSubscriptionEmission = (sessionId, chunk);
}
}
public void EmitOutput(string sessionId, string chunk)
{
Registry?.AppendOutputAsync(sessionId, chunk, CancellationToken.None).GetAwaiter().GetResult();
_outputReceived?.Invoke(this, new TerminalOutputEventArgs(sessionId, chunk));
}
}
private sealed record TerminalAttachResponse(string SessionId, string Type);
}