Skip to content

Commit 6c3577d

Browse files
authored
fix: ensure websockets are correctly passed (#65759)
Further enhancing the typings across the codebase, this resolves some errors discovered while running tests. During development, previously, if the websocket request was forwarded down to the route resolver, it would fail. This is because a `Duplex` stream is not a `ServerResponse`. I opted to use the `MockedResponse` here to ensure the remaining code didn't change, as we're only using the resolve routes code to identify a match rather than actually sending the response on. The response data is sent later with the `proxyRequest` which here does have support for `Duplex` streams.
1 parent 337666b commit 6c3577d

File tree

2 files changed

+94
-72
lines changed

2 files changed

+94
-72
lines changed

packages/next/src/server/lib/router-server.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import { ensureLeadingSlash } from '../../shared/lib/page-path/ensure-leading-sl
4040
import { getNextPathnameInfo } from '../../shared/lib/router/utils/get-next-pathname-info'
4141
import { getHostname } from '../../shared/lib/get-hostname'
4242
import { detectDomainLocale } from '../../shared/lib/i18n/detect-domain-locale'
43+
import { MockedResponse } from './mock-request'
4344

4445
const debug = setupDebug('next:router-server:main')
4546
const isNextFont = (pathname: string | null) =>
@@ -660,9 +661,16 @@ export async function initialize(opts: {
660661
}
661662
}
662663

664+
const res = new MockedResponse({
665+
resWriter: () => {
666+
throw new Error(
667+
'Invariant: did not expect response writer to be written to for upgrade request'
668+
)
669+
},
670+
})
663671
const { matchedOutput, parsedUrl } = await resolveRoutes({
664672
req,
665-
res: socket as any,
673+
res,
666674
isUpgradeReq: true,
667675
signal: signalFromNodeResponse(socket),
668676
})
@@ -674,7 +682,7 @@ export async function initialize(opts: {
674682
}
675683

676684
if (parsedUrl.protocol) {
677-
return await proxyRequest(req, socket as any, parsedUrl, head)
685+
return await proxyRequest(req, socket, parsedUrl, head)
678686
}
679687

680688
// If there's no matched output, we don't handle the request as user's

packages/next/src/server/lib/router-utils/proxy-request.ts

Lines changed: 84 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ import type { NextUrlWithParsedQuery } from '../../request-meta'
33

44
import url from 'url'
55
import { stringifyQuery } from '../../server-route-utils'
6+
import { Duplex } from 'stream'
7+
import { DetachedPromise } from '../../../lib/detached-promise'
68

79
export async function proxyRequest(
810
req: IncomingMessage,
9-
res: ServerResponse,
11+
res: ServerResponse | Duplex,
1012
parsedUrl: NextUrlWithParsedQuery,
11-
upgradeHead?: any,
13+
upgradeHead?: Buffer,
1214
reqBody?: any,
1315
proxyTimeout?: number | null
1416
) {
@@ -33,83 +35,95 @@ export async function proxyRequest(
3335
},
3436
})
3537

36-
await new Promise((proxyResolve, proxyReject) => {
37-
let finished = false
38+
let finished = false
3839

39-
// http-proxy does not properly detect a client disconnect in newer
40-
// versions of Node.js. This is caused because it only listens for the
41-
// `aborted` event on the our request object, but it also fully reads
42-
// and closes the request object. Node **will not** fire `aborted` when
43-
// the request is already closed. Listening for `close` on our response
44-
// object will detect the disconnect, and we can abort the proxy's
45-
// connection.
46-
proxy.on('proxyReq', (proxyReq) => {
47-
res.on('close', () => proxyReq.destroy())
48-
})
49-
proxy.on('proxyRes', (proxyRes) => {
50-
if (res.destroyed) {
51-
proxyRes.destroy()
52-
} else {
53-
res.on('close', () => proxyRes.destroy())
54-
}
55-
})
40+
// http-proxy does not properly detect a client disconnect in newer
41+
// versions of Node.js. This is caused because it only listens for the
42+
// `aborted` event on the our request object, but it also fully reads
43+
// and closes the request object. Node **will not** fire `aborted` when
44+
// the request is already closed. Listening for `close` on our response
45+
// object will detect the disconnect, and we can abort the proxy's
46+
// connection.
47+
proxy.on('proxyReq', (proxyReq) => {
48+
res.on('close', () => proxyReq.destroy())
49+
})
5650

57-
proxy.on('proxyRes', (proxyRes, innerReq, innerRes) => {
58-
const cleanup = (err: any) => {
59-
// cleanup event listeners to allow clean garbage collection
60-
proxyRes.removeListener('error', cleanup)
61-
proxyRes.removeListener('close', cleanup)
62-
innerRes.removeListener('error', cleanup)
63-
innerRes.removeListener('close', cleanup)
64-
65-
// destroy all source streams to propagate the caught event backward
66-
innerReq.destroy(err)
67-
proxyRes.destroy(err)
68-
}
51+
proxy.on('proxyRes', (proxyRes) => {
52+
if (res.destroyed) {
53+
proxyRes.destroy()
54+
} else {
55+
res.on('close', () => proxyRes.destroy())
56+
}
57+
})
6958

70-
proxyRes.once('error', cleanup)
71-
proxyRes.once('close', cleanup)
72-
innerRes.once('error', cleanup)
73-
innerRes.once('close', cleanup)
74-
})
59+
proxy.on('proxyRes', (proxyRes, innerReq, innerRes) => {
60+
const cleanup = (err: any) => {
61+
// cleanup event listeners to allow clean garbage collection
62+
proxyRes.removeListener('error', cleanup)
63+
proxyRes.removeListener('close', cleanup)
64+
innerRes.removeListener('error', cleanup)
65+
innerRes.removeListener('close', cleanup)
66+
67+
// destroy all source streams to propagate the caught event backward
68+
innerReq.destroy(err)
69+
proxyRes.destroy(err)
70+
}
71+
72+
proxyRes.once('error', cleanup)
73+
proxyRes.once('close', cleanup)
74+
innerRes.once('error', cleanup)
75+
innerRes.once('close', cleanup)
76+
})
77+
78+
const detached = new DetachedPromise<boolean>()
79+
80+
// When the proxy finishes proxying the request, shut down the proxy.
81+
detached.promise.finally(() => {
82+
proxy.close()
83+
})
7584

76-
proxy.on('error', (err) => {
77-
console.error(`Failed to proxy ${target}`, err)
78-
if (!finished) {
79-
finished = true
80-
proxyReject(err)
85+
proxy.on('error', (err) => {
86+
console.error(`Failed to proxy ${target}`, err)
87+
if (!finished) {
88+
finished = true
89+
detached.reject(err)
8190

82-
if (!res.destroyed) {
91+
if (!res.destroyed) {
92+
if (!(res instanceof Duplex)) {
8393
res.statusCode = 500
84-
res.end('Internal Server Error')
8594
}
95+
96+
res.end('Internal Server Error')
8697
}
87-
})
98+
}
99+
})
88100

89-
// if upgrade head is present treat as WebSocket request
90-
if (upgradeHead) {
91-
proxy.on('proxyReqWs', (proxyReq) => {
92-
proxyReq.on('close', () => {
93-
if (!finished) {
94-
finished = true
95-
proxyResolve(true)
96-
}
97-
})
98-
})
99-
proxy.ws(req as any as IncomingMessage, res, upgradeHead)
100-
proxyResolve(true)
101-
} else {
102-
proxy.on('proxyReq', (proxyReq) => {
103-
proxyReq.on('close', () => {
104-
if (!finished) {
105-
finished = true
106-
proxyResolve(true)
107-
}
108-
})
101+
// If upgrade head is present or the response is a Duplex stream, treat as
102+
// WebSocket request.
103+
if (upgradeHead || res instanceof Duplex) {
104+
proxy.on('proxyReqWs', (proxyReq) => {
105+
proxyReq.on('close', () => {
106+
if (!finished) {
107+
finished = true
108+
detached.resolve(true)
109+
}
109110
})
110-
proxy.web(req, res, {
111-
buffer: reqBody,
111+
})
112+
proxy.ws(req, res, upgradeHead)
113+
detached.resolve(true)
114+
} else {
115+
proxy.on('proxyReq', (proxyReq) => {
116+
proxyReq.on('close', () => {
117+
if (!finished) {
118+
finished = true
119+
detached.resolve(true)
120+
}
112121
})
113-
}
114-
})
122+
})
123+
proxy.web(req, res, {
124+
buffer: reqBody,
125+
})
126+
}
127+
128+
return detached.promise
115129
}

0 commit comments

Comments
 (0)