From b2b78ff6168a5fc542fa4c7a2334d98b340f4d14 Mon Sep 17 00:00:00 2001 From: GrinZero <774933704@qq.com> Date: Sat, 25 Apr 2026 00:34:06 +0800 Subject: [PATCH] http,inspector: add WebSocket upgrade observability --- doc/api/diagnostics_channel.md | 84 ++++ lib/_http_client.js | 11 + lib/_http_server.js | 16 + lib/inspector.js | 4 + lib/internal/http_websocket_observer.js | 442 ++++++++++++++++++ lib/internal/inspector/network_http.js | 189 +++++++- src/inspector/domain_network.pdl | 45 ++ src/inspector/network_agent.cc | 136 ++++++ src/inspector/network_agent.h | 10 + ...s-channel-http-server-upgrade-websocket.js | 274 +++++++++++ ...gnostics-channel-http-upgrade-websocket.js | 245 ++++++++++ ...st-inspector-emit-protocol-event-errors.js | 126 +++++ .../test-inspector-emit-protocol-event.js | 35 ++ ...nspector-network-http-upgrade-websocket.js | 151 ++++++ 14 files changed, 1754 insertions(+), 14 deletions(-) create mode 100644 lib/internal/http_websocket_observer.js create mode 100644 test/parallel/test-diagnostics-channel-http-server-upgrade-websocket.js create mode 100644 test/parallel/test-diagnostics-channel-http-upgrade-websocket.js create mode 100644 test/parallel/test-inspector-network-http-upgrade-websocket.js diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md index 038c7cb29dd2d1..e01c4ca866c201 100644 --- a/doc/api/diagnostics_channel.md +++ b/doc/api/diagnostics_channel.md @@ -1590,6 +1590,18 @@ Emitted when an error occurs during a client request. Emitted when client receives a response. +##### Event: `'http.client.request.upgrade'` + +* `request` {http.ClientRequest} +* `response` {http.IncomingMessage} +* `socket` {net.Socket} +* `head` {Buffer} +* `protocol` {string} + +Emitted when an HTTP/1.1 client request upgrades to WebSocket. +The `head` value matches the `upgradeHead` argument emitted with the request's +`'upgrade'` event. + ##### Event: `'http.server.request.start'` * `request` {http.IncomingMessage} @@ -1616,6 +1628,78 @@ The event is emitted before the response is sent. Emitted when server sends a response. +##### Event: `'http.server.request.upgrade'` + +* `request` {http.IncomingMessage} +* `socketOrStream` {net.Socket|stream.Duplex} +* `head` {Buffer} +* `protocol` {string} + +Emitted when an HTTP/1.1 server request upgrades to WebSocket. +The `socketOrStream` value matches the upgraded object exposed to the server's +`'upgrade'` event. + +#### WebSocket + +> Stability: 1 - Experimental + +The following WebSocket diagnostics channels currently observe HTTP/1.1 +WebSocket upgrades handled by the built-in `http` and `https` modules. + +##### Event: `'websocket.client.frameSent'` + +* `request` {http.ClientRequest} +* `response` {http.IncomingMessage} +* `socket` {net.Socket} +* `opcode` {number} +* `fin` {boolean} +* `masked` {boolean} +* `compressed` {boolean} +* `payloadLength` {number} +* `payload` {Buffer} + +Emitted when a complete WebSocket frame sent by the client has been observed. + +##### Event: `'websocket.client.frameReceived'` + +* `request` {http.ClientRequest} +* `response` {http.IncomingMessage} +* `socket` {net.Socket} +* `opcode` {number} +* `fin` {boolean} +* `masked` {boolean} +* `compressed` {boolean} +* `payloadLength` {number} +* `payload` {Buffer} + +Emitted when a complete WebSocket frame received by the client has been observed. + +##### Event: `'websocket.server.frameSent'` + +* `request` {http.IncomingMessage} +* `socket` {net.Socket} +* `opcode` {number} +* `fin` {boolean} +* `masked` {boolean} +* `compressed` {boolean} +* `payloadLength` {number} +* `payload` {Buffer} + +Emitted when a complete WebSocket frame sent by the server has been observed. + +##### Event: `'websocket.server.frameReceived'` + +* `request` {http.IncomingMessage} +* `socket` {net.Socket} +* `opcode` {number} +* `fin` {boolean} +* `masked` {boolean} +* `compressed` {boolean} +* `payloadLength` {number} +* `payload` {Buffer} + +Emitted when a complete WebSocket frame received by the server has been observed. + #### HTTP/2 > Stability: 1 - Experimental diff --git a/lib/_http_client.js b/lib/_http_client.js index c14e899dabbf04..a32067e96cb432 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -99,6 +99,13 @@ const onClientRequestStartChannel = dc.channel('http.client.request.start'); const onClientRequestErrorChannel = dc.channel('http.client.request.error'); const onClientResponseFinishChannel = dc.channel('http.client.response.finish'); +let observeClientWebSocketUpgrade; +function lazyObserveClientWebSocketUpgrade() { + observeClientWebSocketUpgrade ??= + require('internal/http_websocket_observer').observeClientWebSocketUpgrade; + return observeClientWebSocketUpgrade; +} + function emitErrorEvent(request, error) { if (onClientRequestErrorChannel.hasSubscribers) { onClientRequestErrorChannel.publish({ @@ -670,6 +677,10 @@ function socketOnData(d) { socket._httpMessage = null; socket.readableFlowing = null; + if (eventName === 'upgrade') { + lazyObserveClientWebSocketUpgrade()(req, res, socket, bodyHead); + } + req.emit(eventName, res, socket, bodyHead); req.destroyed = true; req._closed = true; diff --git a/lib/_http_server.js b/lib/_http_server.js index 68db87b094d960..a0e504a3904b06 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -107,6 +107,13 @@ const onRequestStartChannel = dc.channel('http.server.request.start'); const onResponseCreatedChannel = dc.channel('http.server.response.created'); const onResponseFinishChannel = dc.channel('http.server.response.finish'); +let observeServerWebSocketUpgrade; +function lazyObserveServerWebSocketUpgrade() { + observeServerWebSocketUpgrade ??= + require('internal/http_websocket_observer').observeServerWebSocketUpgrade; + return observeServerWebSocketUpgrade; +} + const kServerResponse = Symbol('ServerResponse'); const kServerResponseStatistics = Symbol('ServerResponseStatistics'); const kUpgradeStream = Symbol('UpgradeStream'); @@ -1090,6 +1097,15 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) { socket[kUpgradeStream].requestBodyCompleted(bodyHead); } else { socket.readableFlowing = null; + if (eventName === 'upgrade') { + lazyObserveServerWebSocketUpgrade()( + req, + upgradeStream, + socket, + bodyHead, + req.complete, + ); + } server.emit(eventName, req, upgradeStream, bodyHead); } } else { diff --git a/lib/inspector.js b/lib/inspector.js index 1c440794f3932f..5010b0f48a23cb 100644 --- a/lib/inspector.js +++ b/lib/inspector.js @@ -220,7 +220,11 @@ const Network = { dataSent: (params) => broadcastToFrontend('Network.dataSent', params), dataReceived: (params) => broadcastToFrontend('Network.dataReceived', params), webSocketCreated: (params) => broadcastToFrontend('Network.webSocketCreated', params), + webSocketWillSendHandshakeRequest: + (params) => broadcastToFrontend('Network.webSocketWillSendHandshakeRequest', params), webSocketClosed: (params) => broadcastToFrontend('Network.webSocketClosed', params), + webSocketFrameReceived: (params) => broadcastToFrontend('Network.webSocketFrameReceived', params), + webSocketFrameSent: (params) => broadcastToFrontend('Network.webSocketFrameSent', params), webSocketHandshakeResponseReceived: (params) => broadcastToFrontend('Network.webSocketHandshakeResponseReceived', params), }; diff --git a/lib/internal/http_websocket_observer.js b/lib/internal/http_websocket_observer.js new file mode 100644 index 00000000000000..08554cb4621b60 --- /dev/null +++ b/lib/internal/http_websocket_observer.js @@ -0,0 +1,442 @@ +'use strict'; + +const { + ArrayIsArray, + ArrayPrototypeIncludes, + Boolean, + NumberIsSafeInteger, + RegExpPrototypeExec, + ReflectApply, + Symbol, +} = primordials; + +const { Buffer } = require('buffer'); +const dc = require('diagnostics_channel'); +const { + isAnyArrayBuffer, + isArrayBufferView, +} = require('internal/util/types'); + +const onClientRequestUpgradeChannel = dc.channel('http.client.request.upgrade'); +const onClientFrameReceivedChannel = dc.channel('websocket.client.frameReceived'); +const onClientFrameSentChannel = dc.channel('websocket.client.frameSent'); +const onServerRequestUpgradeChannel = dc.channel('http.server.request.upgrade'); +const onServerFrameReceivedChannel = dc.channel('websocket.server.frameReceived'); +const onServerFrameSentChannel = dc.channel('websocket.server.frameSent'); + +const kObserverState = Symbol('kObserverState'); +const kWebSocketHeaderValueRegExp = /(?:^|,)\s*websocket\s*(?:,|$)/i; +const kEmptyBuffer = Buffer.alloc(0); +const kHttpHeaderTerminator = Buffer.from('\r\n\r\n'); + +function hasWebSocketHeader(value) { + if (ArrayIsArray(value)) { + for (let i = 0; i < value.length; i++) { + if (hasWebSocketHeader(value[i])) { + return true; + } + } + return false; + } + + return typeof value === 'string' && + RegExpPrototypeExec(kWebSocketHeaderValueRegExp, value) !== null; +} + +function isObservableClientWebSocketUpgrade(request, response) { + return response?.statusCode === 101 && + hasWebSocketHeader(response.headers?.upgrade); +} + +function toBuffer(chunk, encoding) { + if (chunk == null) { + return null; + } + + if (typeof chunk === 'string') { + return Buffer.from(chunk, encoding); + } + + if (isArrayBufferView(chunk)) { + return Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength); + } + + if (isAnyArrayBuffer(chunk)) { + return Buffer.from(chunk); + } + + return null; +} + +function unmask(payload, mask) { + const len = payload.length; + let i = 0; + for (; i + 3 < len; i += 4) { + payload[i] ^= mask[0]; + payload[i + 1] ^= mask[1]; + payload[i + 2] ^= mask[2]; + payload[i + 3] ^= mask[3]; + } + for (; i < len; i++) { + payload[i] ^= mask[i & 3]; + } +} + +class WebSocketFrameParser { + constructor(onFrame) { + this.buffer = kEmptyBuffer; + this.onFrame = onFrame; + this.failed = false; + } + + execute(chunk) { + if (this.failed) { + return; + } + + if (chunk.byteLength === 0) { + return; + } + + let buffer = this.buffer; + if (buffer.byteLength === 0) { + buffer = chunk; + } else { + buffer = Buffer.concat([buffer, chunk], buffer.byteLength + chunk.byteLength); + } + + let offset = 0; + while (buffer.byteLength - offset >= 2) { + const firstByte = buffer[offset]; + const secondByte = buffer[offset + 1]; + + let payloadLength = secondByte & 0x7f; + let cursor = offset + 2; + + if (payloadLength === 126) { + if (buffer.byteLength - offset < 4) { + break; + } + payloadLength = buffer.readUInt16BE(cursor); + cursor += 2; + } else if (payloadLength === 127) { + if (buffer.byteLength - offset < 10) { + break; + } + + const upper = buffer.readUInt32BE(cursor); + const lower = buffer.readUInt32BE(cursor + 4); + payloadLength = upper * 0x100000000 + lower; + cursor += 8; + + if (!NumberIsSafeInteger(payloadLength)) { + this.failed = true; + this.buffer = kEmptyBuffer; + return; + } + } + + let mask; + const masked = Boolean(secondByte & 0x80); + if (masked) { + if (buffer.byteLength - offset < cursor - offset + 4) { + break; + } + mask = buffer.subarray(cursor, cursor + 4); + cursor += 4; + } + + const frameEnd = cursor + payloadLength; + if (buffer.byteLength < frameEnd) { + break; + } + + let payload = buffer.subarray(cursor, frameEnd); + if (masked && payload.byteLength > 0) { + payload = Buffer.from(payload); + unmask(payload, mask); + } else if (payload.byteLength === 0) { + payload = kEmptyBuffer; + } + + this.onFrame({ + opcode: firstByte & 0x0f, + fin: Boolean(firstByte & 0x80), + masked, + compressed: Boolean(firstByte & 0x40), + payloadLength, + payload, + }); + + offset = frameEnd; + } + + this.buffer = offset === buffer.byteLength ? kEmptyBuffer : buffer.subarray(offset); + } +} + +class HttpUpgradePreludeSkipper { + constructor() { + this.done = false; + this.buffer = kEmptyBuffer; + } + + execute(chunk, callback) { + if (this.done) { + callback(chunk); + return; + } + + if (this.buffer.byteLength === 0) { + this.buffer = chunk; + } else { + this.buffer = Buffer.concat([this.buffer, chunk], this.buffer.byteLength + chunk.byteLength); + } + + const headerEnd = this.buffer.indexOf(kHttpHeaderTerminator); + if (headerEnd === -1) { + return; + } + + this.done = true; + const remaining = this.buffer.subarray(headerEnd + kHttpHeaderTerminator.byteLength); + this.buffer = kEmptyBuffer; + if (remaining.byteLength > 0) { + callback(remaining); + } + } +} + +class WebSocketObserver { + constructor({ + request, + response, + socket, + primary, + readable, + writable, + onFrameReceived, + onFrameSent, + skipHttpUpgradeResponse = false, + }) { + this.request = request; + this.response = response; + this.socket = socket; + this.primary = primary; + this.readable = readable; + this.writable = writable; + this.cleanupTargets = []; + this.closed = false; + this.cleanupBound = this.cleanup.bind(this); + this.originalPush = null; + this.originalWrite = null; + + if (typeof onFrameReceived === 'function') { + this.receivedParser = new WebSocketFrameParser((frame) => { + onFrameReceived({ + request: this.request, + response: this.response, + socket: this.socket, + ...frame, + }); + }); + } + + if (typeof onFrameSent === 'function') { + this.sentParser = new WebSocketFrameParser((frame) => { + onFrameSent({ + request: this.request, + response: this.response, + socket: this.socket, + ...frame, + }); + }); + if (skipHttpUpgradeResponse) { + this.sentPreludeSkipper = new HttpUpgradePreludeSkipper(); + } + } + } + + enable(head) { + if (this.receivedParser !== undefined && this.readable?.push) { + this.originalPush = this.readable.push; + const observer = this; + this.readable.push = function observedReadablePush(chunk, encoding) { + observer.observeReceivedChunk(chunk, encoding); + return ReflectApply(observer.originalPush, this, arguments); + }; + } + + if (this.sentParser !== undefined && this.writable?.write) { + this.originalWrite = this.writable.write; + const observer = this; + this.writable.write = function observedWritableWrite(chunk, encoding, callback) { + observer.observeSentChunk(chunk, encoding); + return ReflectApply(observer.originalWrite, this, arguments); + }; + } + + this.addCleanupTarget(this.primary); + this.addCleanupTarget(this.readable); + this.addCleanupTarget(this.writable); + this.addCleanupTarget(this.socket); + + if (this.receivedParser !== undefined && head?.byteLength > 0) { + this.receivedParser.execute(head); + } + } + + addCleanupTarget(target) { + if (target == null || + typeof target.once !== 'function' || + ArrayPrototypeIncludes(this.cleanupTargets, target)) { + return; + } + + this.cleanupTargets.push(target); + target.once('close', this.cleanupBound); + target.once('error', this.cleanupBound); + } + + observeReceivedChunk(chunk, encoding) { + const buffer = toBuffer(chunk, encoding); + if (buffer !== null) { + this.receivedParser.execute(buffer); + } + } + + observeSentChunk(chunk, encoding) { + const buffer = toBuffer(chunk, encoding); + if (buffer === null) { + return; + } + + if (this.sentPreludeSkipper !== undefined) { + this.sentPreludeSkipper.execute(buffer, (remainder) => this.sentParser.execute(remainder)); + return; + } + + this.sentParser.execute(buffer); + } + + cleanup() { + if (this.closed) { + return; + } + + this.closed = true; + + for (let i = 0; i < this.cleanupTargets.length; i++) { + const target = this.cleanupTargets[i]; + target.removeListener('close', this.cleanupBound); + target.removeListener('error', this.cleanupBound); + } + + if (this.originalPush !== null) { + this.readable.push = this.originalPush; + } + + if (this.originalWrite !== null) { + this.writable.write = this.originalWrite; + } + + this.primary[kObserverState] = undefined; + } +} + +function observeClientWebSocketUpgrade(request, response, socket, head) { + if (!onClientRequestUpgradeChannel.hasSubscribers && + !onClientFrameReceivedChannel.hasSubscribers && + !onClientFrameSentChannel.hasSubscribers) { + return; + } + + if (!isObservableClientWebSocketUpgrade(request, response)) { + return; + } + + if (onClientRequestUpgradeChannel.hasSubscribers) { + onClientRequestUpgradeChannel.publish({ + request, + response, + socket, + head, + protocol: 'websocket', + }); + } + + const observeReceived = onClientFrameReceivedChannel.hasSubscribers; + const observeSent = onClientFrameSentChannel.hasSubscribers; + + if (!observeReceived && !observeSent) { + return; + } + + if (socket[kObserverState] !== undefined) { + return; + } + + const observer = new WebSocketObserver({ + request, + response, + socket, + primary: socket, + readable: socket, + writable: socket, + onFrameReceived: observeReceived ? onClientFrameReceivedChannel.publish.bind(onClientFrameReceivedChannel) : undefined, + onFrameSent: observeSent ? onClientFrameSentChannel.publish.bind(onClientFrameSentChannel) : undefined, + }); + socket[kObserverState] = observer; + observer.enable(head); +} + +function observeServerWebSocketUpgrade(request, socketOrStream, socket, head, parseHead = true) { + if (!onServerRequestUpgradeChannel.hasSubscribers && + !onServerFrameReceivedChannel.hasSubscribers && + !onServerFrameSentChannel.hasSubscribers) { + return; + } + + if (!hasWebSocketHeader(request.headers?.upgrade)) { + return; + } + + if (onServerRequestUpgradeChannel.hasSubscribers) { + onServerRequestUpgradeChannel.publish({ + request, + socketOrStream, + head, + protocol: 'websocket', + }); + } + + const observeReceived = onServerFrameReceivedChannel.hasSubscribers; + const observeSent = onServerFrameSentChannel.hasSubscribers; + + if (!observeReceived && !observeSent) { + return; + } + + if (socketOrStream[kObserverState] !== undefined) { + return; + } + + const observer = new WebSocketObserver({ + request, + socket, + primary: socketOrStream, + readable: socketOrStream, + writable: socketOrStream, + onFrameReceived: observeReceived ? onServerFrameReceivedChannel.publish.bind(onServerFrameReceivedChannel) : undefined, + onFrameSent: observeSent ? onServerFrameSentChannel.publish.bind(onServerFrameSentChannel) : undefined, + skipHttpUpgradeResponse: observeSent, + }); + socketOrStream[kObserverState] = observer; + observer.enable(parseHead ? head : undefined); +} + +module.exports = { + hasWebSocketHeader, + observeClientWebSocketUpgrade, + observeServerWebSocketUpgrade, +}; diff --git a/lib/internal/inspector/network_http.js b/lib/internal/inspector/network_http.js index 8d324c8c544eea..26a3a89dfcaad1 100644 --- a/lib/internal/inspector/network_http.js +++ b/lib/internal/inspector/network_http.js @@ -17,9 +17,12 @@ const { sniffMimeType, } = require('internal/inspector/network'); const { Network } = require('inspector'); -const EventEmitter = require('events'); +const { Buffer } = require('buffer'); +const { hasWebSocketHeader } = require('internal/http_websocket_observer'); const kRequestUrl = Symbol('kRequestUrl'); +const kIsWebSocketUpgrade = Symbol('kIsWebSocketUpgrade'); +const kWebSocketClosed = Symbol('kWebSocketClosed'); // Convert a Headers object (Map) to a plain object (Map) const convertHeaderObject = (headers = {}) => { @@ -53,6 +56,34 @@ const convertHeaderObject = (headers = {}) => { return [dict, host, charset, mimeType]; }; +function ensureRequestMetadata(request) { + if (typeof request[kInspectorRequestId] !== 'string') { + request[kInspectorRequestId] = getNextRequestId(); + } + + if (typeof request[kRequestUrl] !== 'string') { + const { 1: host } = convertHeaderObject(request.getHeaders()); + request[kRequestUrl] = `${request.protocol}//${host}${request.path}`; + } +} + +function toWebSocketPayloadData(opcode, payload) { + if (opcode === 0x1) { + return payload.toString(); + } + return payload.toString('base64'); +} + +function toWebSocketUrl(requestUrl) { + if (requestUrl.startsWith('https://')) { + return `wss://${requestUrl.slice('https://'.length)}`; + } + if (requestUrl.startsWith('http://')) { + return `ws://${requestUrl.slice('http://'.length)}`; + } + return requestUrl; +} + /** * When a client request is created, emit Network.requestWillBeSent event. * https://chromedevtools.github.io/devtools-protocol/1-3/Network/#event-requestWillBeSent @@ -64,6 +95,11 @@ function onClientRequestCreated({ request }) { const { 0: headers, 1: host, 2: charset } = convertHeaderObject(request.getHeaders()); const url = `${request.protocol}//${host}${request.path}`; request[kRequestUrl] = url; + request[kIsWebSocketUpgrade] = hasWebSocketHeader(request.getHeader('upgrade')); + + if (request[kIsWebSocketUpgrade]) { + return; + } Network.requestWillBeSent({ requestId: request[kInspectorRequestId], @@ -78,13 +114,88 @@ function onClientRequestCreated({ request }) { }); } +function onClientRequestUpgrade({ request, response, socket }) { + ensureRequestMetadata(request); + request[kIsWebSocketUpgrade] = true; + + const { 0: headers } = convertHeaderObject(request.getHeaders()); + Network.webSocketCreated({ + requestId: request[kInspectorRequestId], + url: toWebSocketUrl(request[kRequestUrl]), + }); + Network.webSocketWillSendHandshakeRequest({ + requestId: request[kInspectorRequestId], + timestamp: getMonotonicTime(), + wallTime: DateNow(), + request: { + headers, + }, + }); + + const { 0: responseHeaders } = convertHeaderObject(response.headers); + Network.webSocketHandshakeResponseReceived({ + requestId: request[kInspectorRequestId], + timestamp: getMonotonicTime(), + response: { + status: response.statusCode, + statusText: response.statusMessage ?? '', + headers: responseHeaders, + }, + }); + + socket[kWebSocketClosed] = false; + socket.once('close', () => { + if (socket[kWebSocketClosed]) { + return; + } + socket[kWebSocketClosed] = true; + Network.webSocketClosed({ + requestId: request[kInspectorRequestId], + timestamp: getMonotonicTime(), + }); + }); +} + +function onWebSocketFrameSent({ request, opcode, masked, payload }) { + ensureRequestMetadata(request); + + Network.webSocketFrameSent({ + requestId: request[kInspectorRequestId], + timestamp: getMonotonicTime(), + response: { + opcode, + mask: masked, + payloadData: toWebSocketPayloadData(opcode, payload), + }, + }); +} + +function onWebSocketFrameReceived({ request, opcode, masked, payload }) { + ensureRequestMetadata(request); + + Network.webSocketFrameReceived({ + requestId: request[kInspectorRequestId], + timestamp: getMonotonicTime(), + response: { + opcode, + mask: masked, + payloadData: toWebSocketPayloadData(opcode, payload), + }, + }); +} + +function shouldSkipHttpRequest(request) { + return typeof request[kInspectorRequestId] !== 'string' || + request[kIsWebSocketUpgrade]; +} + /** * When a client request errors, emit Network.loadingFailed event. * https://chromedevtools.github.io/devtools-protocol/1-3/Network/#event-loadingFailed * @param {{ request: import('http').ClientRequest, error: any }} event */ function onClientRequestError({ request, error }) { - if (typeof request[kInspectorRequestId] !== 'string') { + if (shouldSkipHttpRequest(request)) { return; } Network.loadingFailed({ @@ -95,13 +206,68 @@ function onClientRequestError({ request, error }) { }); } +/** + * When a chunk of the request body is being sent, cache it until + * `getRequestPostData` request. + * https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getRequestPostData + * @param {{ request: import('http').ClientRequest, chunk: Uint8Array | string, encoding?: string }} event + */ +function onClientRequestBodyChunkSent({ request, chunk, encoding }) { + if (shouldSkipHttpRequest(request)) { + return; + } + + const buffer = typeof chunk === 'string' ? Buffer.from(chunk, encoding) : Buffer.from(chunk); + Network.dataSent({ + requestId: request[kInspectorRequestId], + timestamp: getMonotonicTime(), + dataLength: buffer.byteLength, + data: buffer, + }); +} + +/** + * Mark a request body as fully sent. + * @param {{ request: import('http').ClientRequest }} event + */ +function onClientRequestBodySent({ request }) { + if (shouldSkipHttpRequest(request)) { + return; + } + + Network.dataSent({ + requestId: request[kInspectorRequestId], + finished: true, + }); +} + +/** + * When a chunk of the response body is received, cache the raw bytes until + * `getResponseBody` request. + * https://chromedevtools.github.io/devtools-protocol/1-3/Network/#method-getResponseBody + * @param {{ request: import('http').ClientRequest, chunk: Uint8Array }} event + */ +function onClientResponseBodyChunkReceived({ request, chunk }) { + if (shouldSkipHttpRequest(request)) { + return; + } + + Network.dataReceived({ + requestId: request[kInspectorRequestId], + timestamp: getMonotonicTime(), + dataLength: chunk.byteLength, + encodedDataLength: chunk.byteLength, + data: chunk, + }); +} + /** * When response headers are received, emit Network.responseReceived event. * https://chromedevtools.github.io/devtools-protocol/1-3/Network/#event-responseReceived * @param {{ request: import('http').ClientRequest, error: any }} event */ function onClientResponseFinish({ request, response }) { - if (typeof request[kInspectorRequestId] !== 'string') { + if (shouldSkipHttpRequest(request)) { return; } @@ -121,17 +287,6 @@ function onClientResponseFinish({ request, response }) { }, }); - // Unlike response.on('data', ...), this does not put the stream into flowing mode. - EventEmitter.prototype.on.call(response, 'data', (chunk) => { - Network.dataReceived({ - requestId: request[kInspectorRequestId], - timestamp: getMonotonicTime(), - dataLength: chunk.byteLength, - encodedDataLength: chunk.byteLength, - data: chunk, - }); - }); - // Wait until the response body is consumed by user code. response.once('end', () => { Network.loadingFinished({ @@ -143,6 +298,12 @@ function onClientResponseFinish({ request, response }) { module.exports = registerDiagnosticChannels([ ['http.client.request.created', onClientRequestCreated], + ['http.client.request.upgrade', onClientRequestUpgrade], + ['http.client.request.bodyChunkSent', onClientRequestBodyChunkSent], + ['http.client.request.bodySent', onClientRequestBodySent], ['http.client.request.error', onClientRequestError], + ['http.client.response.bodyChunkReceived', onClientResponseBodyChunkReceived], ['http.client.response.finish', onClientResponseFinish], + ['websocket.client.frameSent', onWebSocketFrameSent], + ['websocket.client.frameReceived', onWebSocketFrameReceived], ]); diff --git a/src/inspector/domain_network.pdl b/src/inspector/domain_network.pdl index fdfd9ebe1fdeb4..c6a60ba2360f99 100644 --- a/src/inspector/domain_network.pdl +++ b/src/inspector/domain_network.pdl @@ -96,6 +96,22 @@ experimental domain Network # HTTP response headers. Headers headers + # WebSocket request data. + type WebSocketRequest extends object + properties + # HTTP request headers. + Headers headers + + # WebSocket frame data. + type WebSocketFrame extends object + properties + # WebSocket message opcode. + number opcode + # WebSocket message mask. + boolean mask + # WebSocket message payload data. + string payloadData + # Disables network tracking, prevents network events from being sent to the client. command disable @@ -210,6 +226,17 @@ experimental domain Network string url # Request initiator. Initiator initiator + # Fired when WebSocket handshake request is about to be sent. + event webSocketWillSendHandshakeRequest + parameters + # Request identifier. + RequestId requestId + # Timestamp. + MonotonicTime timestamp + # Timestamp. + TimeSinceEpoch wallTime + # WebSocket request data. + WebSocketRequest request # Fired when WebSocket is closed. event webSocketClosed parameters @@ -226,3 +253,21 @@ experimental domain Network MonotonicTime timestamp # WebSocket response data. WebSocketResponse response + # Fired when WebSocket message is received. + event webSocketFrameReceived + parameters + # Request identifier. + RequestId requestId + # Timestamp. + MonotonicTime timestamp + # WebSocket response data. + WebSocketFrame response + # Fired when WebSocket message is sent. + event webSocketFrameSent + parameters + # Request identifier. + RequestId requestId + # Timestamp. + MonotonicTime timestamp + # WebSocket response data. + WebSocketFrame response diff --git a/src/inspector/network_agent.cc b/src/inspector/network_agent.cc index ace8ba52287186..cebfb85eee2604 100644 --- a/src/inspector/network_agent.cc +++ b/src/inspector/network_agent.cc @@ -150,6 +150,27 @@ NetworkAgent::createResponseFromObject(v8::Local context, .build(); } +std::unique_ptr +NetworkAgent::createWebSocketRequest(v8::Local context, + Local request) { + HandleScope handle_scope(Isolate::GetCurrent()); + Isolate* isolate = env_->isolate(); + Local headers_obj; + if (!ObjectGetObject(context, request, "headers").ToLocal(&headers_obj)) { + ThrowEventError(isolate, "Missing request.headers in event"); + return {}; + } + std::unique_ptr headers = + createHeadersFromObject(context, headers_obj); + if (!headers) { + return {}; + } + + return protocol::Network::WebSocketRequest::create() + .setHeaders(std::move(headers)) + .build(); +} + std::unique_ptr NetworkAgent::createWebSocketResponse(v8::Local context, Local response) { @@ -184,6 +205,35 @@ NetworkAgent::createWebSocketResponse(v8::Local context, .build(); } +std::unique_ptr +NetworkAgent::createWebSocketFrame(v8::Local context, + Local response) { + HandleScope handle_scope(Isolate::GetCurrent()); + Isolate* isolate = env_->isolate(); + double opcode; + if (!ObjectGetDouble(context, response, "opcode").To(&opcode)) { + ThrowEventError(isolate, "Missing response.opcode in event"); + return {}; + } + bool mask; + if (!ObjectGetBool(context, response, "mask").To(&mask)) { + ThrowEventError(isolate, "Missing response.mask in event"); + return {}; + } + protocol::String payload_data; + if (!ObjectGetProtocolString(context, response, "payloadData") + .To(&payload_data)) { + ThrowEventError(isolate, "Missing response.payloadData in event"); + return {}; + } + + return protocol::Network::WebSocketFrame::create() + .setOpcode(opcode) + .setMask(mask) + .setPayloadData(payload_data) + .build(); +} + NetworkAgent::NetworkAgent( NetworkInspector* inspector, v8_inspector::V8Inspector* v8_inspector, @@ -201,9 +251,14 @@ NetworkAgent::NetworkAgent( event_notifier_map_["dataSent"] = &NetworkAgent::dataSent; event_notifier_map_["dataReceived"] = &NetworkAgent::dataReceived; event_notifier_map_["webSocketCreated"] = &NetworkAgent::webSocketCreated; + event_notifier_map_["webSocketWillSendHandshakeRequest"] = + &NetworkAgent::webSocketWillSendHandshakeRequest; event_notifier_map_["webSocketClosed"] = &NetworkAgent::webSocketClosed; event_notifier_map_["webSocketHandshakeResponseReceived"] = &NetworkAgent::webSocketHandshakeResponseReceived; + event_notifier_map_["webSocketFrameReceived"] = + &NetworkAgent::webSocketFrameReceived; + event_notifier_map_["webSocketFrameSent"] = &NetworkAgent::webSocketFrameSent; } void NetworkAgent::webSocketCreated(v8::Local context, @@ -228,6 +283,37 @@ void NetworkAgent::webSocketCreated(v8::Local context, frontend_->webSocketCreated(request_id, url, std::move(initiator)); } +void NetworkAgent::webSocketWillSendHandshakeRequest( + v8::Local context, v8::Local params) { + Isolate* isolate = env_->isolate(); + protocol::String request_id; + if (!ObjectGetProtocolString(context, params, "requestId").To(&request_id)) { + ThrowEventError(isolate, "Missing requestId in event"); + return; + } + double timestamp; + if (!ObjectGetDouble(context, params, "timestamp").To(×tamp)) { + ThrowEventError(isolate, "Missing timestamp in event"); + return; + } + double wall_time; + if (!ObjectGetDouble(context, params, "wallTime").To(&wall_time)) { + ThrowEventError(isolate, "Missing wallTime in event"); + return; + } + Local request_obj; + if (!ObjectGetObject(context, params, "request").ToLocal(&request_obj)) { + ThrowEventError(isolate, "Missing request in event"); + return; + } + auto request = createWebSocketRequest(context, request_obj); + if (!request) { + return; + } + frontend_->webSocketWillSendHandshakeRequest( + request_id, timestamp, wall_time, std::move(request)); +} + void NetworkAgent::webSocketClosed(v8::Local context, v8::Local params) { Isolate* isolate = env_->isolate(); @@ -270,6 +356,56 @@ void NetworkAgent::webSocketHandshakeResponseReceived( request_id, timestamp, std::move(response)); } +void NetworkAgent::webSocketFrameReceived(v8::Local context, + v8::Local params) { + Isolate* isolate = env_->isolate(); + protocol::String request_id; + if (!ObjectGetProtocolString(context, params, "requestId").To(&request_id)) { + ThrowEventError(isolate, "Missing requestId in event"); + return; + } + double timestamp; + if (!ObjectGetDouble(context, params, "timestamp").To(×tamp)) { + ThrowEventError(isolate, "Missing timestamp in event"); + return; + } + Local response_obj; + if (!ObjectGetObject(context, params, "response").ToLocal(&response_obj)) { + ThrowEventError(isolate, "Missing response in event"); + return; + } + auto response = createWebSocketFrame(context, response_obj); + if (!response) { + return; + } + frontend_->webSocketFrameReceived(request_id, timestamp, std::move(response)); +} + +void NetworkAgent::webSocketFrameSent(v8::Local context, + v8::Local params) { + Isolate* isolate = env_->isolate(); + protocol::String request_id; + if (!ObjectGetProtocolString(context, params, "requestId").To(&request_id)) { + ThrowEventError(isolate, "Missing requestId in event"); + return; + } + double timestamp; + if (!ObjectGetDouble(context, params, "timestamp").To(×tamp)) { + ThrowEventError(isolate, "Missing timestamp in event"); + return; + } + Local response_obj; + if (!ObjectGetObject(context, params, "response").ToLocal(&response_obj)) { + ThrowEventError(isolate, "Missing response in event"); + return; + } + auto response = createWebSocketFrame(context, response_obj); + if (!response) { + return; + } + frontend_->webSocketFrameSent(request_id, timestamp, std::move(response)); +} + void NetworkAgent::emitNotification(v8::Local context, const protocol::String& event, v8::Local params) { diff --git a/src/inspector/network_agent.h b/src/inspector/network_agent.h index 2136a45baf45f6..dc67033643e9a8 100644 --- a/src/inspector/network_agent.h +++ b/src/inspector/network_agent.h @@ -73,10 +73,16 @@ class NetworkAgent : public protocol::Network::Backend { void webSocketCreated(v8::Local context, v8::Local params); + void webSocketWillSendHandshakeRequest(v8::Local context, + v8::Local params); void webSocketClosed(v8::Local context, v8::Local params); void webSocketHandshakeResponseReceived(v8::Local context, v8::Local params); + void webSocketFrameReceived(v8::Local context, + v8::Local params); + void webSocketFrameSent(v8::Local context, + v8::Local params); private: std::unique_ptr createHeadersFromObject( @@ -85,8 +91,12 @@ class NetworkAgent : public protocol::Network::Backend { v8::Local context, v8::Local request); std::unique_ptr createResponseFromObject( v8::Local context, v8::Local response); + std::unique_ptr createWebSocketRequest( + v8::Local context, v8::Local request); std::unique_ptr createWebSocketResponse( v8::Local context, v8::Local response); + std::unique_ptr createWebSocketFrame( + v8::Local context, v8::Local response); NetworkInspector* inspector_; v8_inspector::V8Inspector* v8_inspector_; diff --git a/test/parallel/test-diagnostics-channel-http-server-upgrade-websocket.js b/test/parallel/test-diagnostics-channel-http-server-upgrade-websocket.js new file mode 100644 index 00000000000000..eab45d7a759920 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-http-server-upgrade-websocket.js @@ -0,0 +1,274 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const dc = require('diagnostics_channel'); +const http = require('http'); +const net = require('net'); +const stream = require('stream'); +const { once } = require('events'); + +function encodeFrame(opcode, payload, { + fin = true, + masked = false, + mask = Buffer.from([1, 2, 3, 4]), +} = {}) { + payload = typeof payload === 'string' ? Buffer.from(payload) : payload; + + const header = [ + (fin ? 0x80 : 0) | opcode, + ]; + + if (payload.byteLength < 126) { + header.push((masked ? 0x80 : 0) | payload.byteLength); + } else { + throw new Error('payload too large for test'); + } + + if (!masked) { + return Buffer.concat([Buffer.from(header), payload]); + } + + const maskedPayload = Buffer.from(payload); + for (let i = 0; i < maskedPayload.byteLength; i++) { + maskedPayload[i] ^= mask[i % 4]; + } + + return Buffer.concat([Buffer.from(header), mask, maskedPayload]); +} + +async function runClient(port, requestHead, { + initialTail, + secondWrite, + closeFrame, +}) { + const socket = net.createConnection({ + host: '127.0.0.1', + port, + }); + + let responseBuffer = Buffer.alloc(0); + let responseReceived = false; + let secondWriteSent = secondWrite === undefined; + + function maybeClose() { + if (responseReceived && secondWriteSent) { + socket.end(closeFrame); + } + } + + socket.on('data', (chunk) => { + responseBuffer = Buffer.concat([responseBuffer, chunk]); + if (!responseReceived && responseBuffer.includes('\r\n\r\n')) { + responseReceived = true; + maybeClose(); + } + }); + + socket.on('error', common.mustNotCall()); + await once(socket, 'connect'); + + const headers = Buffer.from(requestHead, 'latin1'); + + if (initialTail !== undefined) { + socket.write(Buffer.concat([headers, initialTail])); + } else { + socket.write(headers); + } + + if (secondWrite !== undefined) { + setImmediate(() => { + socket.write(secondWrite); + secondWriteSent = true; + maybeClose(); + }); + } + + await once(socket, 'close'); +} + +const serverTextFrame = encodeFrame(0x1, 'server'); +const completeHeadFrame = encodeFrame(0x1, 'complete', { + masked: true, + mask: Buffer.from([5, 6, 7, 8]), +}); +const incompleteBodyFrame = encodeFrame(0x1, 'incomplete', { + masked: true, + mask: Buffer.from([9, 10, 11, 12]), +}); +const closeFrame = encodeFrame(0x8, Buffer.alloc(0), { + masked: true, + mask: Buffer.from([13, 14, 15, 16]), +}); + +const upgradeEvents = []; +const receivedFrames = []; +const sentFrames = []; + +dc.subscribe('http.server.request.upgrade', common.mustCall((message) => { + upgradeEvents.push({ + url: message.request.url, + socketOrStream: message.socketOrStream, + head: Buffer.from(message.head), + protocol: message.protocol, + }); +}, 2)); + +dc.subscribe('websocket.server.frameReceived', common.mustCall((message) => { + receivedFrames.push({ + url: message.request.url, + socket: message.socket, + opcode: message.opcode, + fin: message.fin, + masked: message.masked, + compressed: message.compressed, + payloadLength: message.payloadLength, + payload: Buffer.from(message.payload), + }); +}, 4)); + +dc.subscribe('websocket.server.frameSent', common.mustCall((message) => { + sentFrames.push({ + url: message.request.url, + socket: message.socket, + opcode: message.opcode, + fin: message.fin, + masked: message.masked, + compressed: message.compressed, + payloadLength: message.payloadLength, + payload: Buffer.from(message.payload), + }); +}, 2)); + +const server = http.createServer(common.mustNotCall()); +server.on('upgrade', common.mustCall((request, socketOrStream) => { + socketOrStream.on('error', common.mustNotCall()); + socketOrStream.resume(); + socketOrStream.write( + 'HTTP/1.1 101 Switching Protocols\r\n' + + 'Connection: Upgrade\r\n' + + 'Upgrade: websocket\r\n' + + '\r\n', + ); + socketOrStream.write(serverTextFrame); + socketOrStream.on('end', () => socketOrStream.end()); +}, 2)); + +server.listen(0, '127.0.0.1', common.mustCall(async () => { + const port = server.address().port; + + await runClient(port, + 'POST /complete HTTP/1.1\r\n' + + 'Host: localhost\r\n' + + 'Connection: Upgrade\r\n' + + 'Upgrade: websocket\r\n' + + 'Content-Length: 0\r\n' + + '\r\n', { + initialTail: completeHeadFrame, + closeFrame, + }); + + await runClient(port, + 'POST /incomplete HTTP/1.1\r\n' + + 'Host: localhost\r\n' + + 'Connection: Upgrade\r\n' + + 'Upgrade: websocket\r\n' + + 'Transfer-Encoding: chunked\r\n' + + 'Trailer: X-Test\r\n' + + '\r\n', { + secondWrite: Buffer.concat([ + Buffer.from('4\r\nbody\r\n0\r\nX-Test: yes\r\n\r\n', 'latin1'), + incompleteBodyFrame, + ]), + closeFrame, + }); + + server.close(); +})); + +process.on('exit', () => { + assert.deepStrictEqual(upgradeEvents.map(({ url, protocol }) => ({ url, protocol })), [ + { url: '/complete', protocol: 'websocket' }, + { url: '/incomplete', protocol: 'websocket' }, + ]); + assert.strictEqual(upgradeEvents[0].socketOrStream instanceof net.Socket, true); + assert.strictEqual(upgradeEvents[1].socketOrStream instanceof stream.Duplex, true); + assert.deepStrictEqual(upgradeEvents[0].head, completeHeadFrame); + assert.ok(upgradeEvents[1].head.byteLength >= 0); + + assert.deepStrictEqual(receivedFrames.map((frame) => ({ + url: frame.url, + opcode: frame.opcode, + fin: frame.fin, + masked: frame.masked, + compressed: frame.compressed, + payloadLength: frame.payloadLength, + payload: frame.payload, + })), [ + { + url: '/complete', + opcode: 0x1, + fin: true, + masked: true, + compressed: false, + payloadLength: 8, + payload: Buffer.from('complete'), + }, + { + url: '/complete', + opcode: 0x8, + fin: true, + masked: true, + compressed: false, + payloadLength: 0, + payload: Buffer.alloc(0), + }, + { + url: '/incomplete', + opcode: 0x1, + fin: true, + masked: true, + compressed: false, + payloadLength: 10, + payload: Buffer.from('incomplete'), + }, + { + url: '/incomplete', + opcode: 0x8, + fin: true, + masked: true, + compressed: false, + payloadLength: 0, + payload: Buffer.alloc(0), + }, + ]); + + assert.deepStrictEqual(sentFrames.map((frame) => ({ + url: frame.url, + opcode: frame.opcode, + fin: frame.fin, + masked: frame.masked, + compressed: frame.compressed, + payloadLength: frame.payloadLength, + payload: frame.payload, + })), [ + { + url: '/complete', + opcode: 0x1, + fin: true, + masked: false, + compressed: false, + payloadLength: 6, + payload: Buffer.from('server'), + }, + { + url: '/incomplete', + opcode: 0x1, + fin: true, + masked: false, + compressed: false, + payloadLength: 6, + payload: Buffer.from('server'), + }, + ]); +}); diff --git a/test/parallel/test-diagnostics-channel-http-upgrade-websocket.js b/test/parallel/test-diagnostics-channel-http-upgrade-websocket.js new file mode 100644 index 00000000000000..3a8f8cab3a719e --- /dev/null +++ b/test/parallel/test-diagnostics-channel-http-upgrade-websocket.js @@ -0,0 +1,245 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const dc = require('diagnostics_channel'); +const http = require('http'); +const net = require('net'); + +function encodeFrame(opcode, payload, { + fin = true, + masked = false, + mask = Buffer.from([1, 2, 3, 4]), +} = {}) { + payload = typeof payload === 'string' ? Buffer.from(payload) : payload; + + const header = [ + (fin ? 0x80 : 0) | opcode, + ]; + + if (payload.byteLength < 126) { + header.push((masked ? 0x80 : 0) | payload.byteLength); + } else { + throw new Error('payload too large for test'); + } + + if (!masked) { + return Buffer.concat([Buffer.from(header), payload]); + } + + const maskedPayload = Buffer.from(payload); + for (let i = 0; i < maskedPayload.byteLength; i++) { + maskedPayload[i] ^= mask[i % 4]; + } + + return Buffer.concat([Buffer.from(header), mask, maskedPayload]); +} + +const serverTextFrame = encodeFrame(0x1, 'hello'); +const serverPingFrame = encodeFrame(0x9, Buffer.from('!')); +const serverBinaryFrame = encodeFrame(0x2, Buffer.from([1, 2, 3])); + +const clientTextFrame = encodeFrame(0x1, 'hel', { + fin: false, + masked: true, + mask: Buffer.from([5, 6, 7, 8]), +}); +const clientContinuationFrame = encodeFrame(0x0, 'lo', { + masked: true, + mask: Buffer.from([9, 10, 11, 12]), +}); +const clientCloseFrame = encodeFrame(0x8, Buffer.alloc(0), { + masked: true, + mask: Buffer.from([13, 14, 15, 16]), +}); + +let request; +let upgradeEvent; +let upgradedSocket; +let upgradeResponse; + +const receivedMessages = []; +const sentMessages = []; +const receivedFrames = []; +const sentFrames = []; + +dc.subscribe('http.client.request.upgrade', common.mustCall((message) => { + upgradeEvent = message; + assert.strictEqual(message.request, request); + assert.strictEqual(message.protocol, 'websocket'); + assert.ok(message.head instanceof Buffer); +}, 1)); + +dc.subscribe('websocket.client.frameReceived', common.mustCall((message) => { + assert.strictEqual(message.request, request); + receivedMessages.push(message); + receivedFrames.push({ + opcode: message.opcode, + fin: message.fin, + masked: message.masked, + compressed: message.compressed, + payloadLength: message.payloadLength, + payload: Buffer.from(message.payload), + }); +}, 3)); + +dc.subscribe('websocket.client.frameSent', common.mustCall((message) => { + assert.strictEqual(message.request, request); + sentMessages.push(message); + sentFrames.push({ + opcode: message.opcode, + fin: message.fin, + masked: message.masked, + compressed: message.compressed, + payloadLength: message.payloadLength, + payload: Buffer.from(message.payload), + }); +}, 3)); + +const server = net.createServer(common.mustCall((socket) => { + let requestBuffer = Buffer.alloc(0); + + socket.on('data', (chunk) => { + requestBuffer = Buffer.concat([requestBuffer, chunk]); + if (!requestBuffer.includes('\r\n\r\n')) { + return; + } + + socket.removeAllListeners('data'); + + const response = Buffer.from( + 'HTTP/1.1 101 Switching Protocols\r\n' + + 'Connection: Upgrade\r\n' + + 'Upgrade: websocket\r\n' + + '\r\n', + 'latin1', + ); + + socket.write(Buffer.concat([ + response, + serverTextFrame.subarray(0, 4), + ])); + + setImmediate(() => { + socket.write(Buffer.concat([ + serverTextFrame.subarray(4), + serverPingFrame, + serverBinaryFrame, + ])); + + setTimeout(() => socket.end(), 20); + }); + }); +}), 1); + +server.listen(0, common.mustCall(() => { + request = http.get({ + port: server.address().port, + headers: { + Connection: 'Upgrade', + Upgrade: 'websocket', + }, + }); + + request.on('error', common.mustNotCall()); + request.on('upgrade', common.mustCall((response, socket) => { + upgradeResponse = response; + upgradedSocket = socket; + + socket.on('error', common.mustNotCall()); + socket.resume(); + + socket.write(clientTextFrame.subarray(0, 2)); + socket.write(clientTextFrame.subarray(2)); + socket.write(Buffer.concat([ + clientContinuationFrame, + clientCloseFrame, + ])); + + socket.on('close', common.mustCall(() => { + assert.strictEqual(upgradeEvent.response, upgradeResponse); + assert.strictEqual(upgradeEvent.socket, upgradedSocket); + assert.ok(upgradeEvent.head.byteLength >= 4); + assert.deepStrictEqual( + upgradeEvent.head.subarray(0, 4), + serverTextFrame.subarray(0, 4), + ); + assert.deepStrictEqual(receivedMessages.map((message) => message.response), [ + upgradeResponse, + upgradeResponse, + upgradeResponse, + ]); + assert.deepStrictEqual(receivedMessages.map((message) => message.socket), [ + upgradedSocket, + upgradedSocket, + upgradedSocket, + ]); + assert.deepStrictEqual(sentMessages.map((message) => message.response), [ + upgradeResponse, + upgradeResponse, + upgradeResponse, + ]); + assert.deepStrictEqual(sentMessages.map((message) => message.socket), [ + upgradedSocket, + upgradedSocket, + upgradedSocket, + ]); + + assert.deepStrictEqual(receivedFrames, [ + { + opcode: 0x1, + fin: true, + masked: false, + compressed: false, + payloadLength: 5, + payload: Buffer.from('hello'), + }, + { + opcode: 0x9, + fin: true, + masked: false, + compressed: false, + payloadLength: 1, + payload: Buffer.from('!'), + }, + { + opcode: 0x2, + fin: true, + masked: false, + compressed: false, + payloadLength: 3, + payload: Buffer.from([1, 2, 3]), + }, + ]); + + assert.deepStrictEqual(sentFrames, [ + { + opcode: 0x1, + fin: false, + masked: true, + compressed: false, + payloadLength: 3, + payload: Buffer.from('hel'), + }, + { + opcode: 0x0, + fin: true, + masked: true, + compressed: false, + payloadLength: 2, + payload: Buffer.from('lo'), + }, + { + opcode: 0x8, + fin: true, + masked: true, + compressed: false, + payloadLength: 0, + payload: Buffer.alloc(0), + }, + ]); + + server.close(); + })); + })); +})); diff --git a/test/parallel/test-inspector-emit-protocol-event-errors.js b/test/parallel/test-inspector-emit-protocol-event-errors.js index 1a76a491c2195c..5940dcd9ce22bf 100644 --- a/test/parallel/test-inspector-emit-protocol-event-errors.js +++ b/test/parallel/test-inspector-emit-protocol-event-errors.js @@ -78,6 +78,18 @@ function webSocketClosed(overrides = {}) { }; } +function webSocketRequest(overrides = {}) { + return { + requestId: 'websocket-request-id', + timestamp: 1000, + wallTime: 1000, + request: { + headers: {}, + }, + ...overrides, + }; +} + function webSocketResponse(overrides = {}) { return { requestId: 'websocket-response-id', @@ -91,6 +103,19 @@ function webSocketResponse(overrides = {}) { }; } +function webSocketFrame(overrides = {}) { + return { + requestId: 'websocket-frame-id', + timestamp: 1000, + response: { + opcode: 1, + mask: false, + payloadData: 'hello', + }, + ...overrides, + }; +} + function storageId(overrides = {}) { return { securityOrigin: '', @@ -257,6 +282,34 @@ const NETWORK_ERROR_CASES = [ ], ['webSocketCreated', omit(webSocketCreated(), 'url'), 'Missing url in event'], + [ + 'webSocketWillSendHandshakeRequest', + omit(webSocketRequest(), 'requestId'), + 'Missing requestId in event', + ], + [ + 'webSocketWillSendHandshakeRequest', + omit(webSocketRequest(), 'timestamp'), + 'Missing timestamp in event', + ], + [ + 'webSocketWillSendHandshakeRequest', + omit(webSocketRequest(), 'wallTime'), + 'Missing wallTime in event', + ], + [ + 'webSocketWillSendHandshakeRequest', + omit(webSocketRequest(), 'request'), + 'Missing request in event', + ], + [ + 'webSocketWillSendHandshakeRequest', + webSocketRequest({ + request: omit(webSocketRequest().request, 'headers'), + }), + 'Missing request.headers in event', + ], + [ 'webSocketClosed', omit(webSocketClosed(), 'requestId'), @@ -304,6 +357,79 @@ const NETWORK_ERROR_CASES = [ }), 'Missing response.headers in event', ], + + [ + 'webSocketFrameReceived', + omit(webSocketFrame(), 'requestId'), + 'Missing requestId in event', + ], + [ + 'webSocketFrameReceived', + omit(webSocketFrame(), 'timestamp'), + 'Missing timestamp in event', + ], + [ + 'webSocketFrameReceived', + omit(webSocketFrame(), 'response'), + 'Missing response in event', + ], + [ + 'webSocketFrameReceived', + webSocketFrame({ + response: omit(webSocketFrame().response, 'opcode'), + }), + 'Missing response.opcode in event', + ], + [ + 'webSocketFrameReceived', + webSocketFrame({ + response: omit(webSocketFrame().response, 'mask'), + }), + 'Missing response.mask in event', + ], + [ + 'webSocketFrameReceived', + webSocketFrame({ + response: omit(webSocketFrame().response, 'payloadData'), + }), + 'Missing response.payloadData in event', + ], + [ + 'webSocketFrameSent', + omit(webSocketFrame(), 'requestId'), + 'Missing requestId in event', + ], + [ + 'webSocketFrameSent', + omit(webSocketFrame(), 'timestamp'), + 'Missing timestamp in event', + ], + [ + 'webSocketFrameSent', + omit(webSocketFrame(), 'response'), + 'Missing response in event', + ], + [ + 'webSocketFrameSent', + webSocketFrame({ + response: omit(webSocketFrame().response, 'opcode'), + }), + 'Missing response.opcode in event', + ], + [ + 'webSocketFrameSent', + webSocketFrame({ + response: omit(webSocketFrame().response, 'mask'), + }), + 'Missing response.mask in event', + ], + [ + 'webSocketFrameSent', + webSocketFrame({ + response: omit(webSocketFrame().response, 'payloadData'), + }), + 'Missing response.payloadData in event', + ], ]; const DOM_STORAGE_ERROR_CASES = [ diff --git a/test/parallel/test-inspector-emit-protocol-event.js b/test/parallel/test-inspector-emit-protocol-event.js index 567c92e3eeba6a..36f541157b7547 100644 --- a/test/parallel/test-inspector-emit-protocol-event.js +++ b/test/parallel/test-inspector-emit-protocol-event.js @@ -96,6 +96,17 @@ const EXPECTED_EVENTS = { } }, + { + name: 'webSocketWillSendHandshakeRequest', + params: { + requestId: 'websocket-id-1', + timestamp: 1000, + wallTime: 1000, + request: { + headers: { upgrade: 'websocket' }, + }, + } + }, { name: 'webSocketHandshakeResponseReceived', params: { @@ -116,6 +127,30 @@ const EXPECTED_EVENTS = { } }, + { + name: 'webSocketFrameReceived', + params: { + requestId: 'websocket-id-1', + timestamp: 1000, + response: { + opcode: 1, + mask: false, + payloadData: 'hello', + }, + } + }, + { + name: 'webSocketFrameSent', + params: { + requestId: 'websocket-id-1', + timestamp: 1000, + response: { + opcode: 1, + mask: true, + payloadData: 'hello', + }, + } + }, ], DOMStorage: [ { diff --git a/test/parallel/test-inspector-network-http-upgrade-websocket.js b/test/parallel/test-inspector-network-http-upgrade-websocket.js new file mode 100644 index 00000000000000..039f5deb722890 --- /dev/null +++ b/test/parallel/test-inspector-network-http-upgrade-websocket.js @@ -0,0 +1,151 @@ +// Flags: --inspect=0 --experimental-network-inspection +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +common.skipIfInspectorDisabled(); + +const assert = require('node:assert'); +const { once } = require('node:events'); +const http = require('node:http'); +const inspector = require('node:inspector/promises'); +const WebSocketServer = require('../common/websocket-server'); + +function encodeFrame(opcode, payload, { + masked = false, + mask = Buffer.from([1, 2, 3, 4]), +} = {}) { + payload = typeof payload === 'string' ? Buffer.from(payload) : payload; + + const header = [ + 0x80 | opcode, + ]; + + if (payload.byteLength < 126) { + header.push((masked ? 0x80 : 0) | payload.byteLength); + } else { + throw new Error('payload too large for test'); + } + + if (!masked) { + return Buffer.concat([Buffer.from(header), payload]); + } + + const maskedPayload = Buffer.from(payload); + for (let i = 0; i < maskedPayload.byteLength; i++) { + maskedPayload[i] ^= mask[i % 4]; + } + + return Buffer.concat([Buffer.from(header), mask, maskedPayload]); +} + +const session = new inspector.Session(); +session.connect(); + +async function test() { + await session.post('Network.enable'); + + const server = new WebSocketServer({}); + await server.start(); + + const host = '127.0.0.1'; + const wsUrl = `ws://${host}:${server.port}/`; + const httpEvents = { + requestWillBeSent: [], + responseReceived: [], + loadingFinished: [], + }; + + session.on('Network.requestWillBeSent', (message) => { + httpEvents.requestWillBeSent.push(message); + }); + session.on('Network.responseReceived', (message) => { + httpEvents.responseReceived.push(message); + }); + session.on('Network.loadingFinished', (message) => { + httpEvents.loadingFinished.push(message); + }); + + let requestId; + const created = once(session, 'Network.webSocketCreated').then(common.mustCall(([message]) => { + assert.strictEqual(message.method, 'Network.webSocketCreated'); + assert.strictEqual(message.params.url, wsUrl); + assert.strictEqual(typeof message.params.requestId, 'string'); + requestId = message.params.requestId; + })); + + const willSend = once(session, 'Network.webSocketWillSendHandshakeRequest').then(common.mustCall(([message]) => { + assert.strictEqual(message.params.requestId, requestId); + assert.strictEqual(typeof message.params.timestamp, 'number'); + assert.strictEqual(typeof message.params.wallTime, 'number'); + assert.strictEqual(message.params.request.headers.upgrade, 'websocket'); + assert.strictEqual(message.params.request.headers.connection, 'Upgrade'); + assert.strictEqual(message.params.request.headers['sec-websocket-version'], '13'); + assert.ok(message.params.request.headers['sec-websocket-key']); + })); + + const handshake = once(session, 'Network.webSocketHandshakeResponseReceived').then(common.mustCall(([message]) => { + assert.strictEqual(message.params.requestId, requestId); + assert.strictEqual(message.params.response.status, 101); + assert.strictEqual(message.params.response.statusText, 'Switching Protocols'); + assert.strictEqual(message.params.response.headers.upgrade, 'websocket'); + assert.strictEqual(message.params.response.headers.connection, 'Upgrade'); + assert.strictEqual(typeof message.params.timestamp, 'number'); + })); + + const frameSent = once(session, 'Network.webSocketFrameSent').then(common.mustCall(([message]) => { + assert.strictEqual(message.params.requestId, requestId); + assert.strictEqual(message.params.response.opcode, 1); + assert.strictEqual(message.params.response.mask, true); + assert.strictEqual(message.params.response.payloadData, 'hello'); + assert.strictEqual(typeof message.params.timestamp, 'number'); + })); + + const frameReceived = once(session, 'Network.webSocketFrameReceived').then(common.mustCall(([message]) => { + assert.strictEqual(message.params.requestId, requestId); + assert.strictEqual(message.params.response.opcode, 1); + assert.strictEqual(message.params.response.mask, false); + assert.strictEqual(message.params.response.payloadData, 'Hello from server!'); + assert.strictEqual(typeof message.params.timestamp, 'number'); + })); + + const closed = once(session, 'Network.webSocketClosed').then(common.mustCall(([message]) => { + assert.strictEqual(message.method, 'Network.webSocketClosed'); + assert.strictEqual(message.params.requestId, requestId); + assert.strictEqual(typeof message.params.timestamp, 'number'); + })); + + const req = http.get({ + host, + port: server.port, + headers: { + Connection: 'Upgrade', + Upgrade: 'websocket', + 'Sec-WebSocket-Key': 'dGhlIHNhbXBsZSBub25jZQ==', + 'Sec-WebSocket-Version': '13', + }, + }); + req.on('error', common.mustNotCall()); + req.on('upgrade', common.mustCall((response, socket, head) => { + assert.ok(head instanceof Buffer); + assert.strictEqual(response.statusCode, 101); + socket.on('error', common.mustNotCall()); + socket.resume(); + socket.once('data', common.mustCall(() => { + socket.end(encodeFrame(0x8, Buffer.alloc(0), { masked: true })); + })); + socket.write(encodeFrame(0x1, 'hello', { masked: true })); + })); + + await Promise.all([created, willSend, handshake, frameSent, frameReceived, closed]); + assert.deepStrictEqual(httpEvents, { + requestWillBeSent: [], + responseReceived: [], + loadingFinished: [], + }); + + session.disconnect(); + server.server.close(); +} + +test().then(common.mustCall());