From 4d011995d1ede05e4d6b7576652eb8cde1644e48 Mon Sep 17 00:00:00 2001 From: Joel Forsyth <64228401+joelmforsyth@users.noreply.github.com> Date: Thu, 16 Apr 2026 15:47:51 -0400 Subject: [PATCH 1/2] Release SSE response stream reference when GET request ends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit StreamableHttpServerTransport held a reference to the Kestrel SSE response stream via _httpSseWriter for the entire session lifetime. Since the transport is only disposed when the session itself is disposed (via explicit DELETE or idle timeout), clients that disconnect without sending DELETE — which is typical for long-lived SSE connections — left the response stream pinned for the remainder of the idle timeout window. Because the Kestrel HTTP connection (and its associated memory pool buffers, Pipe readers/writers, socket send/receive buffers) is only collectible once all references to the response stream are released, this pinned ~20MiB of unmanaged memory per disconnected session until cleanup. Under steady connect/disconnect churn (e.g., IDE availability probes) this accumulated into sustained memory growth that eventually OOMKilled the container. Fix: release _httpSseWriter from within HandleGetRequestAsync's finally block so the reference is dropped as soon as the GET request exits, not just when the session is disposed. SendMessageAsync handles the now-nullable field via the existing _getHttpResponseCompleted gate, and any server-to-client messages sent after disconnect are still captured by the event store writer for replay via Last-Event-ID. Add a unit test that verifies _httpSseWriter is null once HandleGetRequestAsync returns. Relates to #766. --- .../Server/StreamableHttpServerTransport.cs | 70 +++++++++++++------ .../StreamableHttpServerTransportTests.cs | 47 +++++++++++++ 2 files changed, 94 insertions(+), 23 deletions(-) create mode 100644 tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs index 71c366e83..64a160cf7 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs @@ -137,33 +137,54 @@ public async Task HandleGetRequestAsync(Stream sseResponseStream, CancellationTo throw new InvalidOperationException("GET requests are not supported in stateless mode."); } - using (await _unsolicitedMessageLock.LockAsync(cancellationToken).ConfigureAwait(false)) + try { - if (_getHttpRequestStarted) + using (await _unsolicitedMessageLock.LockAsync(cancellationToken).ConfigureAwait(false)) { - throw new InvalidOperationException("Session resumption is not yet supported. Please start a new session."); - } + if (_getHttpRequestStarted) + { + throw new InvalidOperationException("Session resumption is not yet supported. Please start a new session."); + } - _getHttpRequestStarted = true; - _httpSseWriter = new SseEventWriter(sseResponseStream); - _httpResponseTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _storeSseWriter = await TryCreateEventStreamAsync(streamId: UnsolicitedMessageStreamId, cancellationToken).ConfigureAwait(false); - if (_storeSseWriter is not null) - { - var primingItem = await _storeSseWriter.WriteEventAsync(SseItem.Prime(), cancellationToken).ConfigureAwait(false); - await _httpSseWriter.WriteAsync(primingItem, cancellationToken).ConfigureAwait(false); + _getHttpRequestStarted = true; + _httpSseWriter = new SseEventWriter(sseResponseStream); + _httpResponseTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _storeSseWriter = await TryCreateEventStreamAsync(streamId: UnsolicitedMessageStreamId, cancellationToken).ConfigureAwait(false); + if (_storeSseWriter is not null) + { + var primingItem = await _storeSseWriter.WriteEventAsync(SseItem.Prime(), cancellationToken).ConfigureAwait(false); + await _httpSseWriter.WriteAsync(primingItem, cancellationToken).ConfigureAwait(false); + } + else + { + // If there's no priming write, flush the stream to ensure HTTP response headers are + // sent to the client now that the transport is ready to accept messages via SendMessageAsync. + await sseResponseStream.FlushAsync(cancellationToken).ConfigureAwait(false); + } } - else + + // Wait for the response to be written before returning from the handler. + // This keeps the HTTP response open until the final response message is sent. + await _httpResponseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false); + } + finally + { + // Release the SseEventWriter's reference to the response stream promptly when the GET + // request ends, regardless of how it exits. Otherwise the response stream (and the + // underlying Kestrel connection and associated memory pool buffers) remains pinned + // in memory until the session itself is disposed (via explicit DELETE or idle timeout). + // Clients that disconnect without sending DELETE — common with long-lived SSE — would + // otherwise accumulate significant unmanaged memory per session during that interval. + using (await _unsolicitedMessageLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { - // If there's no priming write, flush the stream to ensure HTTP response headers are - // sent to the client now that the transport is ready to accept messages via SendMessageAsync. - await sseResponseStream.FlushAsync(cancellationToken).ConfigureAwait(false); + _getHttpResponseCompleted = true; + if (_httpSseWriter is { } writer) + { + _httpSseWriter = null; + writer.Dispose(); + } } } - - // Wait for the response to be written before returning from the handler. - // This keeps the HTTP response open until the final response message is sent. - await _httpResponseTcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false); } /// @@ -219,7 +240,10 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can return; } - Debug.Assert(_httpSseWriter is not null); + // _httpSseWriter may be null here if the GET request has already ended (e.g. client + // disconnected). _getHttpResponseCompleted is set to true in that case, so the write + // below is correctly skipped. The event store writer (if configured) still captures + // the message so a reconnecting client can replay it via Last-Event-ID. Debug.Assert(_httpResponseTcs is not null); var item = SseItem.Message(message); @@ -229,13 +253,13 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can item = await _storeSseWriter.WriteEventAsync(item, cancellationToken).ConfigureAwait(false); } - if (!_getHttpResponseCompleted) + if (!_getHttpResponseCompleted && _httpSseWriter is { } writer) { // Only write the message to the response if the response has not completed. try { - await _httpSseWriter!.WriteAsync(item, cancellationToken).ConfigureAwait(false); + await writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); } catch (Exception ex) when (!cancellationToken.IsCancellationRequested) { diff --git a/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs b/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs new file mode 100644 index 000000000..77f322b31 --- /dev/null +++ b/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs @@ -0,0 +1,47 @@ +using ModelContextProtocol.Server; +using ModelContextProtocol.Tests.Utils; +using System.IO.Pipelines; +using System.Reflection; + +namespace ModelContextProtocol.Tests.Transport; + +public class StreamableHttpServerTransportTests(ITestOutputHelper testOutputHelper) : LoggedTest(testOutputHelper) +{ + [Fact] + public async Task HandleGetRequestAsync_ReleasesStreamReference_AfterRequestEnds() + { + await using var transport = new StreamableHttpServerTransport() + { + SessionId = "test-session", + }; + + var pipe = new Pipe(); + var responseStream = pipe.Writer.AsStream(); + + using var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromMilliseconds(100)); + + try + { + await transport.HandleGetRequestAsync(responseStream, cts.Token); + } + catch (OperationCanceledException) + { + } + + // After the GET request handler returns, the transport must not retain a reference to the + // response stream via _httpSseWriter. Otherwise the Kestrel connection and its associated + // memory pool buffers (which can be ~20MiB per SSE session) stay pinned in unmanaged memory + // until the session is eventually disposed (via explicit DELETE or idle timeout), causing + // steady memory growth for servers whose clients disconnect without sending DELETE. + var httpSseWriterField = typeof(StreamableHttpServerTransport).GetField( + "_httpSseWriter", + BindingFlags.Instance | BindingFlags.NonPublic); + Assert.NotNull(httpSseWriterField); + var httpSseWriterValue = httpSseWriterField.GetValue(transport); + Assert.Null(httpSseWriterValue); + + await pipe.Reader.CompleteAsync(); + await pipe.Writer.CompleteAsync(); + } +} From 25f080115da01e36f8af67dc56bbb2675e4e85ba Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Thu, 28 May 2026 12:03:58 -0700 Subject: [PATCH 2/2] Apply review feedback: replace _getHttpResponseCompleted with _disposed; rewrite test - Remove _getHttpResponseCompleted and rely on _httpSseWriter being null as the signal that the GET response stream has been released. Null out _httpSseWriter in DisposeAsync so SendMessageAsync correctly skips writing post-dispose. - Replace the dispose-idempotency role of the old flag with a dedicated _disposed bool. - Move the explanatory comment in SendMessageAsync onto the event store branch (which intentionally still runs when the response stream is gone, to support Last-Event-ID replay). - Rewrite the unit test to assert observable behavior via a recording Stream instead of reflecting on private fields. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Server/StreamableHttpServerTransport.cs | 23 +++-- .../StreamableHttpServerTransportTests.cs | 88 ++++++++++++++----- 2 files changed, 76 insertions(+), 35 deletions(-) diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs index 64a160cf7..cd39b2613 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs @@ -44,7 +44,7 @@ public sealed partial class StreamableHttpServerTransport : ITransport private TaskCompletionSource? _httpResponseTcs; private string? _negotiatedProtocolVersion; private bool _getHttpRequestStarted; - private bool _getHttpResponseCompleted; + private bool _disposed; /// /// Initializes a new instance of the class. @@ -177,7 +177,6 @@ public async Task HandleGetRequestAsync(Stream sseResponseStream, CancellationTo // otherwise accumulate significant unmanaged memory per session during that interval. using (await _unsolicitedMessageLock.LockAsync(CancellationToken.None).ConfigureAwait(false)) { - _getHttpResponseCompleted = true; if (_httpSseWriter is { } writer) { _httpSseWriter = null; @@ -240,23 +239,19 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can return; } - // _httpSseWriter may be null here if the GET request has already ended (e.g. client - // disconnected). _getHttpResponseCompleted is set to true in that case, so the write - // below is correctly skipped. The event store writer (if configured) still captures - // the message so a reconnecting client can replay it via Last-Event-ID. Debug.Assert(_httpResponseTcs is not null); var item = SseItem.Message(message); if (_storeSseWriter is not null) { + // Always record the message in the event store (if configured) — even when the GET + // response stream is gone — so a reconnecting client can replay it via Last-Event-ID. item = await _storeSseWriter.WriteEventAsync(item, cancellationToken).ConfigureAwait(false); } - if (!_getHttpResponseCompleted && _httpSseWriter is { } writer) + if (_httpSseWriter is { } writer) { - // Only write the message to the response if the response has not completed. - try { await writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); @@ -273,12 +268,12 @@ public async ValueTask DisposeAsync() { using var _ = await _unsolicitedMessageLock.LockAsync().ConfigureAwait(false); - if (_getHttpResponseCompleted) + if (_disposed) { return; } - _getHttpResponseCompleted = true; + _disposed = true; try { @@ -290,7 +285,11 @@ public async ValueTask DisposeAsync() try { _httpResponseTcs?.TrySetResult(true); - _httpSseWriter?.Dispose(); + if (_httpSseWriter is { } writer) + { + _httpSseWriter = null; + writer.Dispose(); + } if (_storeSseWriter is not null) { diff --git a/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs b/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs index 77f322b31..ce2147e27 100644 --- a/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs +++ b/tests/ModelContextProtocol.Tests/Transport/StreamableHttpServerTransportTests.cs @@ -1,47 +1,89 @@ +using ModelContextProtocol.Protocol; using ModelContextProtocol.Server; using ModelContextProtocol.Tests.Utils; -using System.IO.Pipelines; -using System.Reflection; namespace ModelContextProtocol.Tests.Transport; public class StreamableHttpServerTransportTests(ITestOutputHelper testOutputHelper) : LoggedTest(testOutputHelper) { [Fact] - public async Task HandleGetRequestAsync_ReleasesStreamReference_AfterRequestEnds() + public async Task SendMessageAsync_AfterGetRequestEnds_DoesNotWriteToResponseStream() { + // Regression test for the SSE response stream being retained after the GET request + // handler returns. Without releasing the stream reference, the Kestrel connection + // and its associated memory pool buffers (~20MiB per SSE session) stay pinned in + // unmanaged memory until the session is eventually disposed (via explicit DELETE or + // idle timeout), causing steady memory growth for servers whose clients disconnect + // without sending DELETE. After the GET handler returns, SendMessageAsync must not + // attempt to write to the (now released) response stream. + await using var transport = new StreamableHttpServerTransport() { SessionId = "test-session", }; - var pipe = new Pipe(); - var responseStream = pipe.Writer.AsStream(); + var responseStream = new RecordingStream(); using var cts = new CancellationTokenSource(); - cts.CancelAfter(TimeSpan.FromMilliseconds(100)); + var getTask = transport.HandleGetRequestAsync(responseStream, cts.Token); + + // Wait until the GET handler has finished initialization (signaled by the initial + // flush that sends HTTP response headers) so we know _httpSseWriter is set. + await responseStream.FirstActivity.WaitAsync(TestConstants.DefaultTimeout, TestContext.Current.CancellationToken); + + var writeCountBeforeCancel = responseStream.WriteCount; + + cts.Cancel(); + await Assert.ThrowsAnyAsync(() => getTask); + + await transport.SendMessageAsync( + new JsonRpcNotification { Method = "test" }, + TestContext.Current.CancellationToken); + + Assert.Equal(writeCountBeforeCancel, responseStream.WriteCount); + } + + private sealed class RecordingStream : Stream + { + private readonly TaskCompletionSource _firstActivity = new(TaskCreationOptions.RunContinuationsAsynchronously); + private int _writeCount; - try + public Task FirstActivity => _firstActivity.Task; + public int WriteCount => Volatile.Read(ref _writeCount); + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => throw new NotSupportedException(); + public override long Position { - await transport.HandleGetRequestAsync(responseStream, cts.Token); + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); } - catch (OperationCanceledException) + + public override void Flush() => _firstActivity.TrySetResult(true); + + public override Task FlushAsync(CancellationToken cancellationToken) { + _firstActivity.TrySetResult(true); + return Task.CompletedTask; } - // After the GET request handler returns, the transport must not retain a reference to the - // response stream via _httpSseWriter. Otherwise the Kestrel connection and its associated - // memory pool buffers (which can be ~20MiB per SSE session) stay pinned in unmanaged memory - // until the session is eventually disposed (via explicit DELETE or idle timeout), causing - // steady memory growth for servers whose clients disconnect without sending DELETE. - var httpSseWriterField = typeof(StreamableHttpServerTransport).GetField( - "_httpSseWriter", - BindingFlags.Instance | BindingFlags.NonPublic); - Assert.NotNull(httpSseWriterField); - var httpSseWriterValue = httpSseWriterField.GetValue(transport); - Assert.Null(httpSseWriterValue); - - await pipe.Reader.CompleteAsync(); - await pipe.Writer.CompleteAsync(); + public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) + { + Interlocked.Increment(ref _writeCount); + _firstActivity.TrySetResult(true); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + Interlocked.Increment(ref _writeCount); + _firstActivity.TrySetResult(true); + return Task.CompletedTask; + } } }