diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index d92b747c..47dd4700 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -45,16 +45,6 @@ jobs: brew install autoconf automake libtool re2c bison libiconv \ argon2 libzip postgresql@16 - # TODO: Do we need to care about x86_64 macOS? - # NOTE: Unable to force link bison on macOS 13, which php-src requires. - - host: macos-13 - target: x86_64-apple-darwin - # build: pnpm build --target x86_64-apple-darwin - build: pnpm build - setup: | - brew install autoconf automake libtool re2c bison libiconv \ - argon2 libzip postgresql@16 - # # Linux # @@ -302,11 +292,6 @@ jobs: fail-fast: false matrix: settings: - - host: macos-13 - target: x86_64-apple-darwin - architecture: x64 - setup: | - brew install openssl@3 argon2 postgresql@16 - host: macos-15 target: aarch64-apple-darwin architecture: arm64 @@ -353,7 +338,20 @@ jobs: run: ls -R ./npm shell: bash - name: Test bindings - run: pnpm test + run: | + # Run tests with LLDB to catch segfaults on macOS + # Use the ava entrypoint directly instead of the bin script + lldb -b \ + -o "settings set auto-confirm true" \ + -o "process launch" \ + -o "bt all" \ + -- node node_modules/ava/entrypoints/cli.mjs __test__/handler.spec.mjs __test__/headers.spec.mjs __test__/request.spec.mjs __test__/response.spec.mjs __test__/rewriter.spec.mjs __test__/streaming.spec.mjs + + EXIT_CODE=$? + if [ $EXIT_CODE -ne 0 ]; then + echo "=== Test failed with exit code $EXIT_CODE ===" + exit $EXIT_CODE + fi test-linux-binding: name: Test bindings on ${{ matrix.target }} - node@${{ matrix.node }} @@ -431,9 +429,34 @@ jobs: libcurl4-openssl-dev autoconf libxml2-dev libsqlite3-dev \ bison re2c libonig-dev libargon2-dev libzip-dev zlib1g-dev \ openssh-client libclang-dev libreadline-dev libpng-dev \ - libjpeg-dev libsodium-dev libpq5 - - npm run test + libjpeg-dev libsodium-dev libpq5 gdb + + # Enable core dumps + ulimit -c unlimited + echo '/tmp/core.%e.%p' > /proc/sys/kernel/core_pattern || true + + # Run tests with GDB to catch segfaults + # Use the ava entrypoint directly instead of the bin script + gdb -batch \ + -ex "set pagination off" \ + -ex "set confirm off" \ + -ex "handle SIGSEGV stop print" \ + -ex "run" \ + -ex "thread apply all bt full" \ + --args node node_modules/ava/entrypoints/cli.mjs __test__/handler.spec.mjs __test__/headers.spec.mjs __test__/request.spec.mjs __test__/response.spec.mjs __test__/rewriter.spec.mjs __test__/streaming.spec.mjs + + EXIT_CODE=$? + if [ $EXIT_CODE -ne 0 ]; then + echo "=== Test failed with exit code $EXIT_CODE ===" + # Try to find and analyze core dumps + if ls /tmp/core.* 2>/dev/null; then + for core in /tmp/core.*; do + echo "=== Analyzing core dump: $core ===" + gdb -batch -ex "bt full" -ex "thread apply all bt" node "$core" || true + done + fi + exit $EXIT_CODE + fi publish: name: Publish a release diff --git a/Cargo.toml b/Cargo.toml index f95b5fa2..ba888d5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,20 +20,20 @@ name = "php-main" path = "src/main.rs" [dependencies] -async-trait = "0.1.88" bytes = "1.10.1" hostname = "0.4.1" -ext-php-rs = { version = "0.14.0", features = ["embed"] } +ext-php-rs = { version = "0.15.2", features = ["embed"] } +http-body-util = "0.1" http-handler = { git = "https://github.com/platformatic/http-handler.git" } # http-handler = { path = "../http-handler" } http-rewriter = { git = "https://github.com/platformatic/http-rewriter.git" } # http-rewriter = { path = "../http-rewriter" } libc = "0.2.171" # Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix -napi = { version = "3", default-features = false, features = ["napi4"], optional = true } +napi = { version = "3", default-features = false, features = ["napi4", "tokio_rt", "async"], optional = true } napi-derive = { version = "3", optional = true } once_cell = "1.21.0" -tokio = { version = "1.45", features = ["rt", "macros", "rt-multi-thread"] } +tokio = { version = "1.45", features = ["rt", "macros", "rt-multi-thread", "sync"] } regex = "1.0" [dev-dependencies] diff --git a/__test__/request.spec.mjs b/__test__/request.spec.mjs index 34ed4481..fd90852a 100644 --- a/__test__/request.spec.mjs +++ b/__test__/request.spec.mjs @@ -10,8 +10,6 @@ test('minimum construction requirements', (t) => { t.is(req.method, 'GET') t.is(req.url, 'http://example.com/test.php') - t.assert(req.body instanceof Buffer) - t.is(req.body.length, 0) t.assert(req.headers instanceof Headers) t.is(req.headers.size, 0) }) diff --git a/__test__/response.spec.mjs b/__test__/response.spec.mjs index b6a463bb..90b7f1f7 100644 --- a/__test__/response.spec.mjs +++ b/__test__/response.spec.mjs @@ -9,8 +9,6 @@ test('Minimal response construction', (t) => { t.is(res.status, 200) t.assert(res.headers instanceof Headers) - t.assert(res.body instanceof Buffer) - t.deepEqual(res.body.toString(), '') t.assert(res.log instanceof Buffer) t.deepEqual(res.log.toString(), '') t.is(res.exception, null) @@ -37,8 +35,6 @@ test('Full Response construction', (t) => { t.assert(res.headers instanceof Headers) t.deepEqual(res.headers.get('Content-Type'), 'application/json') t.deepEqual(res.headers.getAll('Accept'), ['application/json', 'text/plain']) - t.assert(res.body instanceof Buffer) - t.deepEqual(res.body.toString(), json) t.assert(res.log instanceof Buffer) t.deepEqual(res.log.toString(), 'Hello, from error_log!') t.deepEqual(res.exception, 'Hello, from PHP!') diff --git a/__test__/streaming.spec.mjs b/__test__/streaming.spec.mjs new file mode 100644 index 00000000..6126ebcd --- /dev/null +++ b/__test__/streaming.spec.mjs @@ -0,0 +1,251 @@ +import test from 'ava' + +import { Php, Request } from '../index.js' +import { MockRoot } from './util.mjs' + +test('handleStream - basic response', async (t) => { + const mockroot = await MockRoot.from({ + 'index.php': `` + }) + t.teardown(() => mockroot.clean()) + + const php = new Php({ + docroot: mockroot.path + }) + + const req = new Request({ + method: 'GET', + url: 'http://example.com/index.php' + }) + + const [res] = await Promise.all([ + php.handleStream(req), + req.end() + ]) + + t.is(res.status, 200) + + // Collect streaming body + let body = '' + for await (const chunk of res) { + body += chunk.toString('utf8') + } + t.is(body, 'Hello, from PHP!') +}) + +test('handleStream - chunked output', async (t) => { + const mockroot = await MockRoot.from({ + 'stream.php': `` + }) + t.teardown(() => mockroot.clean()) + + const php = new Php({ + docroot: mockroot.path + }) + + const req = new Request({ + method: 'GET', + url: 'http://example.com/stream.php' + }) + + const [res] = await Promise.all([ + php.handleStream(req), + req.end() + ]) + + t.is(res.status, 200) + + // Collect all chunks + const chunks = [] + for await (const chunk of res) { + chunks.push(chunk.toString('utf8')) + } + + // Should have received all chunks + const body = chunks.join('') + t.is(body, 'Chunk 1Chunk 2Chunk 3') +}) + +test('handleStream - headers available immediately', async (t) => { + const mockroot = await MockRoot.from({ + 'headers.php': `` + }) + t.teardown(() => mockroot.clean()) + + const php = new Php({ + docroot: mockroot.path + }) + + const req = new Request({ + method: 'GET', + url: 'http://example.com/headers.php' + }) + + const [res] = await Promise.all([ + php.handleStream(req), + req.end() + ]) + + // Headers should be available immediately + t.is(res.status, 200) + t.is(res.headers.get('x-custom-header'), 'test-value') + t.is(res.headers.get('content-type'), 'application/json') + + // Body can be consumed after + let body = '' + for await (const chunk of res) { + body += chunk.toString('utf8') + } + t.is(body, '{"status": "ok"}') +}) + +test('handleStream - POST with buffered body', async (t) => { + const mockroot = await MockRoot.from({ + 'echo.php': `` + }) + t.teardown(() => mockroot.clean()) + + const php = new Php({ + docroot: mockroot.path + }) + + const req = new Request({ + method: 'POST', + url: 'http://example.com/echo.php', + headers: { + 'Content-Type': 'text/plain' + }, + body: Buffer.from('Hello from client!') + }) + + const res = await php.handleStream(req) + t.is(res.status, 200) + + let body = '' + for await (const chunk of res) { + body += chunk.toString('utf8') + } + t.is(body, 'Received: Hello from client!') +}) + +test('handleStream - POST with streamed body', async (t) => { + const mockroot = await MockRoot.from({ + 'echo.php': `` + }) + t.teardown(() => mockroot.clean()) + + const php = new Php({ + docroot: mockroot.path + }) + + const req = new Request({ + method: 'POST', + url: 'http://example.com/echo.php', + headers: { + 'Content-Type': 'text/plain' + } + }) + + // Run handleStream and writes concurrently using Promise.all + const [res] = await Promise.all([ + php.handleStream(req), + (async () => { + await req.write('Hello ') + await req.write('from ') + await req.write('streaming!') + await req.end() + })() + ]) + + t.is(res.status, 200) + + let body = '' + for await (const chunk of res) { + body += chunk.toString('utf8') + } + t.is(body, 'Received: Hello from streaming!') +}) + +test.skip('handleStream - exception handling', async (t) => { + // TODO: Implement proper exception handling in streaming mode + // See EXCEPTIONS.md for implementation approaches + const mockroot = await MockRoot.from({ + 'error.php': `` + }) + t.teardown(() => mockroot.clean()) + + const php = new Php({ + docroot: mockroot.path + }) + + const req = new Request({ + method: 'GET', + url: 'http://example.com/error.php' + }) + + const res = await php.handleStream(req) + + // Exception should be sent through the stream + let errorOccurred = false + try { + for await (const chunk of res) { + // Should not receive chunks, should throw + } + } catch (err) { + errorOccurred = true + t.true(err.message.includes('Exception')) + } + + t.true(errorOccurred, 'Exception should be thrown during iteration') +}) + +test('handleStream - empty response', async (t) => { + const mockroot = await MockRoot.from({ + 'empty.php': `` + }) + t.teardown(() => mockroot.clean()) + + const php = new Php({ + docroot: mockroot.path + }) + + const req = new Request({ + method: 'GET', + url: 'http://example.com/empty.php' + }) + + const [res] = await Promise.all([ + php.handleStream(req), + req.end() + ]) + + t.is(res.status, 200) + + let body = '' + for await (const chunk of res) { + body += chunk.toString('utf8') + } + t.is(body, '') +}) diff --git a/index.d.ts b/index.d.ts index eab843eb..13bd930c 100644 --- a/index.d.ts +++ b/index.d.ts @@ -405,6 +405,8 @@ export declare class Request { /** * Get the body of the request as a Buffer. * + * Returns buffered data if the request was created with a body in the constructor. + * * # Examples * * ```js @@ -418,25 +420,7 @@ export declare class Request { * console.log(request.body.toString()); // {"message":"Hello, world!"} * ``` */ - get body(): Buffer - /** - * Set the body of the request. - * - * # Examples - * - * ```js - * const request = new Request({ - * url: "/v2/api/thing" - * }); - * - * request.body = Buffer.from(JSON.stringify({ - * message: 'Hello, world!' - * })); - * - * console.log(request.body.toString()); // {"message":"Hello, world!"} - * ``` - */ - set body(body: Buffer) + get body(): Buffer | null /** * Convert the response to a JSON object representation. * @@ -458,6 +442,39 @@ export declare class Request { * ``` */ toJSON(): object + /** + * Write a chunk to the request body stream + * + * # Examples + * + * ```js + * const request = new Request({ + * method: "POST", + * url: "/upload" + * }); + * + * await request.write(Buffer.from('chunk 1')); + * await request.write('chunk 2'); + * await request.end(); + * ``` + */ + write(chunk: Buffer | string): Promise + /** + * End the request body stream (HTTP mode only) + * + * # Examples + * + * ```js + * const request = new Request({ + * method: "POST", + * url: "/upload" + * }); + * + * await request.write(Buffer.from('data')); + * await request.end(); + * ``` + */ + end(): Promise } /** @@ -576,37 +593,28 @@ export declare class Response { */ set headers(headers: Headers) /** - * Get the body of the response as a Buffer. + * Get the buffered body of the response as a Buffer. * - * # Examples + * Note: With the new streaming architecture, response bodies are not buffered by default. + * This getter returns buffered data if it was explicitly buffered (e.g., by handleRequest). + * For streaming responses, use the AsyncIterator protocol via next(). * - * ```js - * const response = new Response({ - * body: Buffer.from(JSON.stringify({ - * message: 'Hello, world!' - * })) - * }); - * - * console.log(response.body.toString()); // {"message":"Hello, world!"} - * ``` - */ - get body(): Buffer - /** - * Set the body of the response. + * Returns `undefined` for streaming responses without buffering. * * # Examples * * ```js - * const response = new Response(); + * // After handleRequest (automatically buffered) + * const response = await python.handleRequest(request); + * console.log(response.body.toString()); // Works - body was buffered * - * response.body = Buffer.from(JSON.stringify({ - * message: 'Hello, world!' - * })); - * - * console.log(response.body.toString()); // {"message":"Hello, world!"} + * // For streaming responses, use AsyncIterator + * for await (const chunk of response) { + * console.log(chunk.toString()); + * } * ``` */ - set body(body: Buffer) + get body(): Buffer | null /** * Get the log of the response as a Buffer. * @@ -655,6 +663,22 @@ export declare class Response { * ``` */ toJSON(): object + /** + * Read the next chunk from the response body stream + * + * Returns the next chunk as a Buffer, or undefined if the stream has ended. + * This method is used to implement AsyncIterator in JavaScript. + * + * For WebSocket responses (when WebSocketMode extension is present), this automatically + * decodes WebSocket frames and returns the payload data. + * + * # Examples + * + * ```js + * console.log(await response.next()); // Buffer | undefined + * ``` + */ + next(): Promise } /** A multi-map of HTTP headers. Any given header key can have multiple values. */ @@ -679,6 +703,8 @@ export interface RequestOptions { socket?: SocketInfo /** Document root for the request, if applicable. */ docroot?: string + /** Whether this is a WebSocket request. */ + websocket?: boolean } /** Input options for creating a Response. */ @@ -1387,6 +1413,35 @@ export declare class Php { * ``` */ handleRequestSync(request: PhpRequest): PhpResponse + /** + * Handle a streaming PHP request. + * + * Returns immediately after headers are sent, with body chunks streaming + * asynchronously. Use the AsyncIterator interface to read the response body. + * + * # Examples + * + * ```js + * const php = new Php({ + * docroot: process.cwd(), + * argv: process.argv + * }); + * + * const response = await php.handleStream(new Request({ + * method: 'GET', + * url: 'http://example.com' + * })); + * + * console.log(response.status); // Available immediately + * console.log(response.headers); // Available immediately + * + * // Read streaming body + * for await (const chunk of response) { + * console.log('Chunk:', chunk.toString()); + * } + * ``` + */ + handleStream(request: PhpRequest, signal?: AbortSignal | undefined | null): Promise } export type PhpRuntime = Php diff --git a/npm/darwin-universal/README.md b/npm/darwin-universal/README.md deleted file mode 100644 index 4bd654f8..00000000 --- a/npm/darwin-universal/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# `@platformatic/php-darwin-universal` - -This is the **universal-apple-darwin** binary for `@platformatic/php` diff --git a/npm/darwin-universal/package.json b/npm/darwin-universal/package.json deleted file mode 100644 index 0ee34184..00000000 --- a/npm/darwin-universal/package.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "name": "@platformatic/php-darwin-universal", - "version": "0.0.0", - "os": [ - "darwin" - ], - "main": "php.darwin-universal.node", - "files": [ - "php.darwin-universal.node" - ], - "license": "MIT", - "engines": { - "node": ">= 10" - } -} \ No newline at end of file diff --git a/npm/darwin-x64/README.md b/npm/darwin-x64/README.md deleted file mode 100644 index 3fdd968c..00000000 --- a/npm/darwin-x64/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# `@platformatic/php-darwin-x64` - -This is the **x86_64-apple-darwin** binary for `@platformatic/php` diff --git a/npm/darwin-x64/package.json b/npm/darwin-x64/package.json deleted file mode 100644 index 8a5e31b6..00000000 --- a/npm/darwin-x64/package.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "name": "@platformatic/php-darwin-x64", - "version": "0.0.0", - "os": [ - "darwin" - ], - "cpu": [ - "x64" - ], - "main": "php.darwin-x64.node", - "files": [ - "php.darwin-x64.node" - ], - "license": "MIT", - "engines": { - "node": ">= 10" - } -} \ No newline at end of file diff --git a/package.json b/package.json index da922cee..6d5af00c 100644 --- a/package.json +++ b/package.json @@ -11,9 +11,7 @@ "binaryName": "php", "targets": [ "aarch64-apple-darwin", - "x86_64-apple-darwin", - "x86_64-unknown-linux-gnu", - "universal-apple-darwin" + "x86_64-unknown-linux-gnu" ] }, "license": "MIT", @@ -34,7 +32,6 @@ "build:debug": "napi build --platform --no-js --output-dir . --features napi-support -- --lib", "lint": "oxlint", "test": "ava __test__/**.spec.mjs", - "universal": "napi universal", "version": "napi version" } } diff --git a/src/embed.rs b/src/embed.rs index 14ea8e31..38a652b9 100644 --- a/src/embed.rs +++ b/src/embed.rs @@ -6,13 +6,14 @@ use std::{ }; use ext_php_rs::{ - alloc::{efree, estrdup}, + alloc::estrdup, error::Error, - ffi::{php_execute_script, sapi_get_default_content_type}, + ffi::php_execute_script, zend::{try_catch, try_catch_first, ExecutorGlobals, SapiGlobals}, }; -use http_handler::{Handler, Request, Response, ResponseBuilderExt}; +use http_handler::types::{Request, Response}; +use http_handler::Handler; use super::{ sapi::{ensure_sapi, Sapi}, @@ -21,6 +22,37 @@ use super::{ EmbedRequestError, EmbedStartError, RequestContext, }; +/// Extension type to keep the blocking PHP task alive while the response is being consumed +#[derive(Clone)] +pub struct BlockingTaskHandle( + Arc>>>>, +); + +impl BlockingTaskHandle { + /// Get access to the inner mutex for awaiting the blocking task + #[cfg(feature = "napi-support")] + pub async fn take_and_await(&self) { + let mut guard = self.0.lock().await; + if let Some(handle) = guard.take() { + let _ = handle.await; + } + } +} + +impl Drop for BlockingTaskHandle { + fn drop(&mut self) { + // Only abort if this is the last reference + if Arc::strong_count(&self.0) == 1 { + // Try to take and abort the handle without blocking + if let Ok(mut guard) = self.0.try_lock() { + if let Some(handle) = guard.take() { + handle.abort(); + } + } + } + } +} + /// A simple trait for rewriting requests that works with our specific request type pub trait RequestRewriter: Send + Sync { /// Rewrite the given request and return the modified request @@ -189,11 +221,13 @@ impl Embed { } } -#[async_trait::async_trait] impl Handler for Embed { type Error = EmbedRequestError; - /// Handles an HTTP request. + /// Handles an HTTP request with streaming response. + /// + /// Returns immediately after headers are sent, with body chunks streaming asynchronously. + /// Buffering is handled externally by NAPI Task compute() methods when needed. /// /// # Examples /// @@ -209,170 +243,228 @@ impl Handler for Embed { /// let handler = Embed::new(docroot.clone(), None) /// .expect("should construct Embed"); /// + /// # tokio::runtime::Runtime::new().unwrap().block_on(async { + /// let body = http_handler::RequestBody::new(); + /// + /// // Close the request body stream - callers must always shutdown the stream before calling handle() + /// { + /// use tokio::io::AsyncWriteExt; + /// let mut body_writer = body.clone(); + /// body_writer.shutdown().await.expect("should close request body stream"); + /// } + /// /// let request = http_handler::request::Request::builder() /// .method("GET") - /// .uri("http://example.com") - /// .body(bytes::BytesMut::new()) + /// .uri("http://example.com/index.php") + /// .body(body) /// .expect("should build request"); /// - /// # tokio_test::block_on(async { /// let response = handler.handle(request) /// .await /// .expect("should handle request"); - /// # }); /// - /// //assert_eq!(response.status(), 200); - /// //assert_eq!(response.body(), "Hello, world!"); + /// // Consume the streaming response body to ensure PHP task completes + /// use http_body_util::BodyExt; + /// let (_parts, body) = response.into_parts(); + /// let mut stream = body; + /// while let Some(frame_result) = stream.frame().await { + /// match frame_result { + /// Ok(_) => continue, + /// Err(e) => panic!("Error reading response: {}", e), + /// } + /// } + /// + /// drop(handler); + /// # }); /// ``` + /// Handle a PHP request with streaming response. + /// + /// Returns immediately after headers are sent, with body chunks streaming asynchronously. + /// All buffering is external to this method, handled by NAPI Task compute() methods. async fn handle(&self, request: Request) -> Result { - let docroot = self.docroot.clone(); - - // Initialize the SAPI module - self.sapi.startup()?; - - // Get REQUEST_URI _first_ as it needs the pre-rewrite state. - let url = request.uri(); - let request_uri = url.path(); - - // Apply request rewriting rules - let mut request = request.clone(); - if let Some(rewriter) = &self.rewriter { - request = rewriter + use tokio::sync::oneshot; + + // Extract owned data from request BEFORE any PHP operations + let uri = request.uri().clone(); + let request_uri_str = uri.path().to_string(); + let method_str = request.method().as_str().to_string(); + let query_str = uri.query().unwrap_or("").to_string(); + + // Clone headers as owned HashMap + let headers_map: std::collections::HashMap = request + .headers() + .iter() + .filter_map(|(k, v)| { + v.to_str() + .ok() + .map(|val| (k.as_str().to_string(), val.to_string())) + }) + .collect(); + + // Apply request rewriting rules (on async thread, before blocking) + let request = if let Some(rewriter) = &self.rewriter { + rewriter .rewrite_request(request, &self.docroot) - .map_err(|e| EmbedRequestError::RequestRewriteError(e.to_string()))?; - } + .map_err(|e| EmbedRequestError::RequestRewriteError(e.to_string()))? + } else { + request + }; - let translated_path = translate_path(&docroot, request.uri().path())? + // Translate path on async thread + let docroot = self.docroot.clone(); + let translated_path_str = translate_path(&docroot, request.uri().path())? .display() .to_string(); - // Convert REQUEST_URI and PATH_TRANSLATED to C strings - let request_uri = estrdup(request_uri); - let path_translated = estrdup(translated_path.clone()); - - // Extract request method, query string, and headers - let request_method = estrdup(request.method().as_str()); - let query_string = estrdup(url.query().unwrap_or("")); - - let headers = request.headers(); - let content_type = headers - .get("Content-Type") - .and_then(|v| v.to_str().ok()) - .map(estrdup) - .unwrap_or(std::ptr::null_mut()); - let content_length = headers - .get("Content-Length") - .and_then(|v| v.to_str().ok()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(0); - - // Prepare argv and argc - let argc = self.args.len() as i32; - let mut argv_ptrs = vec![]; - for arg in self.args.iter() { - argv_ptrs.push(estrdup(arg.to_owned())); - } + // Don't buffer request body here - let read_post stream it as PHP requests it + // The request body channel will be read by read_post when PHP calls file_get_contents('php://input') + // Note: BodyBuffer extension (if present) is written to the stream in the NAPI Task layer + + // Create streaming response body + let response_body = request.body().create_response(); + let response_writer = response_body.clone(); - let script_name = translated_path.clone(); + // Channel to receive headers + status + custom headers + logs when ready (send owned data) + let (headers_sent_tx, headers_sent_rx) = + oneshot::channel::<(u16, String, Vec<(String, String)>, bytes::Bytes)>(); - // Fixed RefUnwindSafe issue (FIXME.md #1) by setting up RequestContext before try_catch_first - // This avoids the need to rebuild the request inside the closure - RequestContext::for_request(request, docroot.clone()); + // Clone args and sapi for the blocking task + let args = self.args.clone(); + let sapi = self.sapi.clone(); - let response = try_catch_first(move || { - // Set server context + // Spawn blocking PHP execution - ALL PHP operations happen here + let blocking_handle = tokio::task::spawn_blocking(move || { + // Initialize SAPI on the PHP thread (per-thread initialization required) + if let Err(e) = sapi.startup() { + return Err(e); + } + + // NOW do all the estrdup calls on the PHP thread + let request_uri_c = estrdup(request_uri_str.as_str()); + let path_translated = estrdup(translated_path_str.as_str()); + let request_method = estrdup(method_str.as_str()); + let query_string = estrdup(query_str.as_str()); + + let content_type = headers_map + .get("content-type") + .or_else(|| headers_map.get("Content-Type")) + .map(|s| estrdup(s.as_str())) + .unwrap_or(std::ptr::null_mut()); + + let content_length = headers_map + .get("content-length") + .or_else(|| headers_map.get("Content-Length")) + .and_then(|s| s.parse::().ok()) + .unwrap_or(-1); // -1 means unknown length for streaming requests + + // Prepare argv + let argc = args.len() as i32; + let mut argv_ptrs = vec![]; + for arg in args.iter() { + argv_ptrs.push(estrdup(arg.as_str())); + } + + // Setup RequestContext (always streaming from SAPI perspective) + // RequestContext::new() will extract the request body's read stream and add it as RequestStream extension + let ctx = RequestContext::new( + request, + docroot.clone(), + response_writer.clone(), + headers_sent_tx, + ); + RequestContext::set_current(Box::new(ctx)); + + // Set SAPI globals { let mut globals = SapiGlobals::get_mut(); globals.options |= ext_php_rs::ffi::SAPI_OPTION_NO_CHDIR as i32; - - // Reset state globals.request_info.proto_num = 110; globals.request_info.argc = argc; globals.request_info.argv = argv_ptrs.as_mut_ptr(); globals.request_info.headers_read = false; globals.sapi_headers.http_response_code = 200; - - // Set request info from request globals.request_info.request_method = request_method; globals.request_info.query_string = query_string; globals.request_info.path_translated = path_translated; - globals.request_info.request_uri = request_uri; - - // TODO: Add auth fields - + globals.request_info.request_uri = request_uri_c; globals.request_info.content_type = content_type; globals.request_info.content_length = content_length; } - let _request_scope = RequestScope::new()?; - - // Run script in its own try/catch so bailout doesn't skip request shutdown. - { - let mut file_handle = FileHandleScope::new(script_name.clone()); - try_catch(|| unsafe { php_execute_script(file_handle.deref_mut()) }) - .map_err(|_| EmbedRequestError::Bailout)?; - } + let result = try_catch_first(|| { + let _request_scope = RequestScope::new()?; - if let Some(err) = ExecutorGlobals::take_exception() { + // Execute PHP script { - let mut globals = SapiGlobals::get_mut(); - globals.sapi_headers.http_response_code = 500; + let mut file_handle = FileHandleScope::new(translated_path_str.clone()); + try_catch(|| unsafe { php_execute_script(file_handle.deref_mut()) }) + .map_err(|_| EmbedRequestError::Bailout)?; } - let ex = Error::Exception(err); - - if let Some(ctx) = RequestContext::current() { - let builder = std::mem::replace( - ctx.response_builder_mut(), - http_handler::response::Builder::new(), - ); - let builder = builder.exception(ex.to_string()).status(500); - *ctx.response_builder_mut() = builder; + // Handle exceptions + if let Some(err) = ExecutorGlobals::take_exception() { + let ex = Error::Exception(err); + return Err(EmbedRequestError::Exception(ex.to_string())); } - return Err(EmbedRequestError::Exception(ex.to_string())); - }; + Ok(()) + // RequestScope drops here, triggering request shutdown + // Output buffering flush happens during shutdown, calling ub_write + // RequestContext must still be alive at this point! + }); - let (mut mimetype, http_response_code) = { - let h = SapiGlobals::get().sapi_headers; - (h.mimetype, h.http_response_code) + // IMPORTANT: Shutdown response stream before reclaiming RequestContext + // This signals EOF to response body consumers waiting on the stream + if let Some(ctx) = RequestContext::current() { + ctx.shutdown_response_stream(); + } + + // Reclaim RequestContext AFTER RequestScope has dropped + // This ensures output buffer flush during shutdown can still access the context + let _ctx = RequestContext::reclaim(); + + // Flatten the result + let final_result = match result { + Ok(Ok(())) => Ok(()), + Ok(Err(e)) => Err(e), + Err(_) => Err(EmbedRequestError::Bailout), }; + final_result + }); - if mimetype.is_null() { - mimetype = unsafe { sapi_get_default_content_type() }; - } + // Wait for headers to be sent (with owned status, mimetype, custom headers, and logs) + // The JavaScript code should call req.end() concurrently using Promise.all to avoid deadlock + let (status, mime_str, custom_headers, logs) = headers_sent_rx + .await + .map_err(|_| EmbedRequestError::ResponseBuildError)?; - let mime_ptr = - unsafe { mimetype.as_ref() }.ok_or(EmbedRequestError::FailedToDetermineContentType)?; + // Build response with headers and streaming body (on async thread, using owned data) + let mut builder = http_handler::response::Builder::new() + .status(status) + .header("Content-Type", mime_str); - let mime = unsafe { std::ffi::CStr::from_ptr(mime_ptr) } - .to_str() - .map_err(|_| EmbedRequestError::FailedToDetermineContentType)? - .to_owned(); + // Add custom headers from PHP header() calls + for (name, value) in custom_headers { + builder = builder.header(name, value); + } - unsafe { - efree(mimetype.cast::()); - } + let mut response = builder + .body(response_body) + .map_err(|_| EmbedRequestError::ResponseBuildError)?; - // Set the final status and content-type header - if let Some(ctx) = RequestContext::current() { - let builder = std::mem::replace( - ctx.response_builder_mut(), - http_handler::response::Builder::new(), - ); - let builder = builder - .status(http_response_code as u16) - .header("Content-Type", mime); - *ctx.response_builder_mut() = builder; - } + // Store logs in extensions for streaming mode (available but not streamed) + if !logs.is_empty() { + response + .extensions_mut() + .insert(http_handler::ResponseLog::from_bytes(logs)); + } - // Build the final response with accumulated data using the extension system - RequestContext::reclaim() - .ok_or(EmbedRequestError::ResponseBuildError)? - .build_response() - .map_err(|_| EmbedRequestError::ResponseBuildError) - }) - .unwrap_or(Err(EmbedRequestError::Bailout))?; + // Store the blocking task handle to keep it alive while response is consumed + response + .extensions_mut() + .insert(BlockingTaskHandle(Arc::new(tokio::sync::Mutex::new(Some( + blocking_handle, + ))))); Ok(response) } diff --git a/src/exception.rs b/src/exception.rs index c56d3d92..9b922c54 100644 --- a/src/exception.rs +++ b/src/exception.rs @@ -73,6 +73,9 @@ pub enum EmbedRequestError { /// Error during request rewriting RequestRewriteError(String), + + /// Error handling request body stream + RequestBodyError(String), } impl std::fmt::Display for EmbedRequestError { @@ -108,6 +111,9 @@ impl std::fmt::Display for EmbedRequestError { EmbedRequestError::RequestRewriteError(e) => { write!(f, "Request rewrite error: {}", e) } + EmbedRequestError::RequestBodyError(e) => { + write!(f, "Request body error: {}", e) + } } } } diff --git a/src/extensions.rs b/src/extensions.rs new file mode 100644 index 00000000..6b698143 --- /dev/null +++ b/src/extensions.rs @@ -0,0 +1,50 @@ +/// Custom extension types for RequestWrapper +/// +/// These extensions store request-specific state that needs to be shared +/// across SAPI callbacks and async boundaries. +use bytes::Bytes; +use http_handler::ResponseBody; +use std::sync::Arc; +use tokio::sync::{oneshot, Mutex}; + +/// Extension for storing the response body stream +/// +/// Allows SAPI callbacks to write response body data. +#[derive(Clone)] +pub struct ResponseStream(pub ResponseBody); + +impl ResponseStream { + /// Create a new ResponseStream extension with the given response body + pub fn new(body: ResponseBody) -> Self { + Self(body) + } +} + +/// Extension for storing the headers sent notification channel +/// +/// Used to signal when headers are finalized and send them to the response builder. +/// Contains: (status_code, mime_type, custom_headers, logs) +#[derive(Clone)] +pub struct HeadersSentTx( + pub Arc, Bytes)>>>>, +); + +impl HeadersSentTx { + /// Create a new HeadersSentTx extension with the given oneshot sender + pub fn new(sender: oneshot::Sender<(u16, String, Vec<(String, String)>, Bytes)>) -> Self { + Self(Arc::new(Mutex::new(Some(sender)))) + } +} + +/// Extension for storing the request body stream +/// +/// Allows SAPI callbacks to read streaming request body data. +#[derive(Clone)] +pub struct RequestStream(pub http_handler::RequestBody); + +impl RequestStream { + /// Create a new RequestStream extension with the given request body + pub fn new(body: http_handler::RequestBody) -> Self { + Self(body) + } +} diff --git a/src/lib.rs b/src/lib.rs index 6d4c6f7d..da8d2c37 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,23 +24,44 @@ //! let embed = Embed::new_with_args(docroot, None, args()) //! .expect("should construct embed"); //! +//! # tokio_test::block_on(async { +//! let body = http_handler::RequestBody::new(); +//! +//! // Write body data and close the stream +//! { +//! use tokio::io::AsyncWriteExt; +//! let mut body_writer = body.clone(); +//! body_writer.write_all(b"Hello, World!").await.expect("should write body"); +//! body_writer.shutdown().await.expect("should close request body stream"); +//! } +//! //! let request = http_handler::request::Request::builder() //! .method("POST") //! .uri("http://example.com/index.php") //! .header("Content-Type", "text/html") //! .header("Content-Length", "13") -//! .body(bytes::BytesMut::from("Hello, World!")) +//! .body(body) //! .expect("should build request"); //! -//! # tokio_test::block_on(async { //! let response = embed //! .handle(request.clone()) //! .await //! .expect("should handle request"); //! //! assert_eq!(response.status(), 200); -//! assert_eq!(response.body(), "Hello, World!"); -//! println!("Response: {:#?}", response); +//! +//! // Consume the streaming response body to ensure PHP task completes +//! use http_body_util::BodyExt; +//! let (_parts, body) = response.into_parts(); +//! let mut stream = body; +//! while let Some(frame_result) = stream.frame().await { +//! match frame_result { +//! Ok(_) => continue, +//! Err(e) => panic!("Error reading response: {}", e), +//! } +//! } +//! +//! drop(embed); //! # }); //! ``` @@ -54,6 +75,7 @@ extern crate napi_derive; mod embed; mod exception; +mod extensions; mod request_context; mod sapi; mod scopes; @@ -64,9 +86,8 @@ mod test; /// NAPI bindings for exposing PHP to Node.js pub mod napi; -pub use http_handler::{ - Handler, Request, RequestBuilderExt, Response, ResponseException, ResponseExt, -}; +pub use http_handler::types::{Request, Response}; +pub use http_handler::{Handler, RequestBuilderExt, ResponseException, ResponseExt}; pub use http_rewriter as rewrite; // Re-export commonly used types from http crate @@ -77,5 +98,6 @@ pub use http_handler::{ pub use embed::{Embed, RequestRewriter}; pub use exception::{EmbedRequestError, EmbedStartError}; +pub use extensions::{HeadersSentTx, RequestStream, ResponseStream}; pub use request_context::RequestContext; pub use test::{MockRoot, MockRootBuilder}; diff --git a/src/main.rs b/src/main.rs index 9020f743..f38fa8a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use std::{env::current_dir, fs::File, io::Write, path::PathBuf}; -use bytes::BytesMut; -use php::{rewrite::PathRewriter, Embed, Handler, Request, RequestRewriter}; +use php::{rewrite::PathRewriter, Embed, Handler, RequestRewriter}; #[tokio::main] async fn main() { @@ -15,16 +14,18 @@ async fn main() { let embed = Embed::new_with_args(docroot, maybe_rewriter, std::env::args()) .expect("should construct embed"); - // Build request using the re-exported Request type from http crate - let mut request = Request::new(BytesMut::from("Hello, World!")); - *request.method_mut() = "POST".parse().unwrap(); - *request.uri_mut() = "http://example.com/test.php".parse().unwrap(); - request - .headers_mut() - .insert("Content-Type", "text/html".parse().unwrap()); - request - .headers_mut() - .insert("Content-Length", "13".parse().unwrap()); + // Build request using RequestBody + let body = http_handler::RequestBody::from_data(bytes::Bytes::from("Hello, World!")) + .await + .expect("should create body"); + + let request = http_handler::request::Request::builder() + .method("POST") + .uri("http://example.com/test.php") + .header("Content-Type", "text/html") + .header("Content-Length", "13") + .body(body) + .expect("should build request"); println!("request: {:#?}", request); diff --git a/src/napi.rs b/src/napi.rs index e400d99f..1112409f 100644 --- a/src/napi.rs +++ b/src/napi.rs @@ -3,7 +3,10 @@ use std::sync::Arc; use napi::bindgen_prelude::*; use napi::{Env, Error, Result, Task}; -use crate::{Embed, EmbedRequestError, Handler, Request, RequestRewriter, Response}; +use crate::embed::BlockingTaskHandle; +use crate::sapi::fallback_handle; +use crate::{Embed, EmbedRequestError, Handler, RequestRewriter}; +use crate::{Request, Response}; use http_handler::napi::{Request as PhpRequest, Response as PhpResponse}; use http_rewriter::napi::Rewriter as NapiRewriter; @@ -115,14 +118,14 @@ impl PhpRuntime { #[napi] pub fn handle_request( &self, - request: &PhpRequest, + request: PhpRequest, signal: Option, ) -> AsyncTask { AsyncTask::with_optional_signal( PhpRequestTask { throw_request_errors: self.throw_request_errors, embed: self.embed.clone(), - request: request.clone().into_inner(), + request: Some(request.into_inner()), }, signal, ) @@ -147,21 +150,63 @@ impl PhpRuntime { /// console.log(response.body); /// ``` #[napi] - pub fn handle_request_sync(&self, request: &PhpRequest) -> Result { + pub fn handle_request_sync(&self, request: PhpRequest) -> Result { let mut task = PhpRequestTask { throw_request_errors: self.throw_request_errors, embed: self.embed.clone(), - request: request.clone().into_inner(), + request: Some(request.into_inner()), }; task.compute().map(Into::::into) } + + /// Handle a streaming PHP request. + /// + /// Returns immediately after headers are sent, with body chunks streaming + /// asynchronously. Use the AsyncIterator interface to read the response body. + /// + /// # Examples + /// + /// ```js + /// const php = new Php({ + /// docroot: process.cwd(), + /// argv: process.argv + /// }); + /// + /// const response = await php.handleStream(new Request({ + /// method: 'GET', + /// url: 'http://example.com' + /// })); + /// + /// console.log(response.status); // Available immediately + /// console.log(response.headers); // Available immediately + /// + /// // Read streaming body + /// for await (const chunk of response) { + /// console.log('Chunk:', chunk.toString()); + /// } + /// ``` + #[napi] + pub fn handle_stream( + &self, + request: PhpRequest, + signal: Option, + ) -> AsyncTask { + AsyncTask::with_optional_signal( + PhpStreamTask { + throw_request_errors: self.throw_request_errors, + embed: self.embed.clone(), + request: Some(request.into_inner()), + }, + signal, + ) + } } /// Task container to run a PHP request in a worker thread. pub struct PhpRequestTask { embed: Arc, - request: Request, + request: Option, throw_request_errors: bool, } @@ -172,22 +217,122 @@ impl Task for PhpRequestTask { // Handle the PHP request in the worker thread. fn compute(&mut self) -> Result { - let runtime = tokio::runtime::Runtime::new().map_err(|e| Error::from_reason(e.to_string()))?; - let mut result = runtime.block_on(self.embed.handle(self.request.clone())); + let request = self + .request + .take() + .ok_or_else(|| Error::from_reason("Request already consumed"))?; + + // Extract body buffer and body handle before moving request + let body_buffer_data: Option = + if let Some(buf) = request.extensions().get::() { + if !buf.is_empty() { + Some(bytes::Bytes::copy_from_slice(buf.as_bytes())) + } else { + None + } + } else { + None + }; + let request_body = request.body().clone(); + + // Write request body concurrently with handle() to avoid thread pool exhaustion + // If a BodyBuffer is present, write and close the stream automatically + // Otherwise, JavaScript code must call request.end() to close the stream + let _write_handle = fallback_handle().spawn(async move { + use tokio::io::AsyncWriteExt; + + let mut body = request_body; + + // If there's a BodyBuffer extension, write it (if not empty) + if let Some(data) = body_buffer_data { + let _ = body.write_all(&data).await; + } + + // Always close the stream, even when there's no body + let _ = body.shutdown().await; + }); + + // Call handle() which returns a streaming response + // We need to buffer it here for handleRequest/handleRequestSync backward compatibility + // Use fallback_handle() to avoid deadlocks in spawn_blocking contexts + let mut result = fallback_handle().block_on(async { + use http_body_util::BodyExt; + + let response = self.embed.handle(request).await?; + + // Buffer the streaming body for backward compatibility + let (parts, body) = response.into_parts(); + + // Collect body chunks, capturing any exceptions sent through the stream + let mut body_buffer = bytes::BytesMut::new(); + let mut exception: Option = None; + + let mut stream = body; + loop { + match stream.frame().await { + Some(frame_result) => match frame_result { + Ok(frame) => { + if let Ok(data) = frame.into_data() { + body_buffer.extend_from_slice(&data); + } + } + Err(e) => { + exception = Some(e.to_string()); + break; + } + }, + None => break, + } + } + + // Extract and await the blocking task handle to ensure PHP execution completes + if let Some(handle_wrapper) = parts.extensions.get::() { + handle_wrapper.take_and_await().await; + } + + let mut response = Response::from_parts(parts, http_handler::ResponseBody::new()); + + // Add buffered body to extensions + response + .extensions_mut() + .insert(http_handler::BodyBuffer::from_bytes(body_buffer.freeze())); + + // Add exception to extensions if one occurred + if let Some(ex) = exception { + response + .extensions_mut() + .insert(http_handler::ResponseException(ex)); + } + + Ok::<_, EmbedRequestError>(response) + }); // Translate the various error types into HTTP error responses if !self.throw_request_errors { result = result.or_else(|err| { - Ok(match err { - EmbedRequestError::ScriptNotFound(_script_name) => http_handler::response::Builder::new() - .status(404) - .body(bytes::BytesMut::from("Not Found")) - .unwrap(), - _ => http_handler::response::Builder::new() - .status(500) - .body(bytes::BytesMut::from("Internal Server Error")) - .unwrap(), - }) + let (mut response, body_content) = match err { + EmbedRequestError::ScriptNotFound(_script_name) => ( + http_handler::response::Builder::new() + .status(404) + .body(http_handler::ResponseBody::new()) + .unwrap(), + "Not Found", + ), + _ => ( + http_handler::response::Builder::new() + .status(500) + .body(http_handler::ResponseBody::new()) + .unwrap(), + "Internal Server Error", + ), + }; + + // Add body content as BodyBuffer extension + response + .extensions_mut() + .insert(http_handler::BodyBuffer::from_bytes(body_content)); + + Ok(response) }) } @@ -199,3 +344,104 @@ impl Task for PhpRequestTask { Ok(Into::::into(output)) } } + +/// Task container to run a PHP streaming request in a worker thread. +pub struct PhpStreamTask { + embed: Arc, + request: Option, + throw_request_errors: bool, +} + +#[napi] +impl Task for PhpStreamTask { + type Output = Response; + type JsValue = Object<'static>; + + // Handle the PHP streaming request in the worker thread. + fn compute(&mut self) -> Result { + // Take ownership of the request to preserve the body channels + let request = self + .request + .take() + .ok_or_else(|| Error::from_reason("Request already consumed"))?; + + // Extract body buffer and body handle before moving request + let body_buffer_data: Option = + if let Some(buf) = request.extensions().get::() { + if !buf.is_empty() { + Some(bytes::Bytes::copy_from_slice(buf.as_bytes())) + } else { + None + } + } else { + None + }; + let has_body_buffer = body_buffer_data.is_some(); + let request_body = request.body().clone(); + + // Write request body concurrently with handle() to avoid thread pool exhaustion + // If a BodyBuffer is present, write and close the stream automatically + // Otherwise, JavaScript code must call request.end() to close the stream + let _write_handle = fallback_handle().spawn(async move { + use tokio::io::AsyncWriteExt; + + let mut body = request_body; + + // If body was provided in constructor (BodyBuffer exists), write it and close the stream + if let Some(data) = body_buffer_data { + let _ = body.write_all(&data).await; + } + // Close the stream if body was fully provided + if has_body_buffer { + let _ = body.shutdown().await; + } + // If no BodyBuffer, JavaScript will write via req.write() and close via req.end() + // Don't touch the stream here to avoid "broken pipe" errors + }); + + // Use fallback_handle() to avoid deadlocks in spawn_blocking contexts + let mut result = fallback_handle().block_on(async { + // Let write task run concurrently with handle() + self.embed.handle(request).await + }); + + // Translate the various error types into HTTP error responses + if !self.throw_request_errors { + result = result.or_else(|err| { + let (mut response, body_content) = match err { + EmbedRequestError::ScriptNotFound(_script_name) => ( + http_handler::response::Builder::new() + .status(404) + .body(http_handler::ResponseBody::new()) + .unwrap(), + "Not Found", + ), + _ => ( + http_handler::response::Builder::new() + .status(500) + .body(http_handler::ResponseBody::new()) + .unwrap(), + "Internal Server Error", + ), + }; + + // Store body content in BodyBuffer extension + response + .extensions_mut() + .insert(http_handler::BodyBuffer::from_bytes(bytes::Bytes::from( + body_content, + ))); + + Ok(response) + }); + } + + result.map_err(|e| Error::from_reason(e.to_string())) + } + + // Handle converting the PHP response to a JavaScript response in the main thread. + fn resolve(&mut self, env: Env, output: Self::Output) -> Result { + let response: PhpResponse = Into::::into(output); + response.make_streamable(env) + } +} diff --git a/src/request_context.rs b/src/request_context.rs index 6b1a9cbb..51427905 100644 --- a/src/request_context.rs +++ b/src/request_context.rs @@ -1,25 +1,93 @@ +/// Request wrapper for the PHP SAPI +/// +/// This wrapper stores the minimal non-Clone state that cannot be moved to extensions: +/// - The Request itself (for Deref access) +/// - The response Builder (non-Clone, cannot use std::mem::replace pattern) +/// - The request Receiver (non-Clone, single ownership) +/// +/// All shareable state is stored in Request extensions: +/// - DocumentRoot (http-handler) - docroot path +/// - ResponseLog (http-handler) - log buffer +/// - BodyBuffer (http-handler) - request body buffer +/// - ResponseStream (custom) - response body stream +/// - RequestStream (custom) - request body stream +/// - HeadersSentTx (custom) - headers sent notification +use bytes::Bytes; use ext_php_rs::zend::SapiGlobals; -use http_handler::{BodyBuffer, Request}; -use std::{ffi::c_void, path::PathBuf}; +use http_handler::extensions::{BodyBuffer, DocumentRoot, ResponseLog}; +use http_handler::types::Request; +use http_handler::RequestExt; +use std::ffi::c_void; +use std::ops::{Deref, DerefMut}; +use std::path::Path; +use tokio::sync::oneshot; + +use crate::extensions::{HeadersSentTx, RequestStream, ResponseStream}; /// The request context for the PHP SAPI. -pub struct RequestContext { - request: Request, - response_builder: http_handler::response::Builder, - docroot: PathBuf, +/// +/// This is a minimal wrapper around Request that provides Deref/DerefMut access. +/// All state is stored in Request extensions. +pub struct RequestContext(Request); + +impl Deref for RequestContext { + type Target = Request; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for RequestContext { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } } impl RequestContext { - /// Sets the current request context for the PHP SAPI. - pub fn for_request(request: Request, docroot: S) + /// Creates a new RequestContext for handling a PHP request. + /// + /// Since SAPI is always in streaming mode (buffering happens at the NAPI Task layer), + /// we always set up streaming channels. The docroot and all state are + /// stored as Request extensions. + pub fn new

( + mut request: Request, + docroot: P, + response_body: http_handler::ResponseBody, + headers_sent_tx: oneshot::Sender<(u16, String, Vec<(String, String)>, Bytes)>, + ) -> Self where - S: Into, + P: AsRef, { - let context = Box::new(RequestContext { - request, - response_builder: http_handler::response::Builder::new(), - docroot: docroot.into(), + // Get the request body for SAPI to read from + let request_body = request.body().clone(); + + // Store all state in extensions + request.set_document_root(DocumentRoot { + path: docroot.as_ref().to_path_buf(), }); + + request.extensions_mut().insert(ResponseLog::new()); + + request.extensions_mut().insert(BodyBuffer::new()); + + request + .extensions_mut() + .insert(ResponseStream::new(response_body)); + + request + .extensions_mut() + .insert(HeadersSentTx::new(headers_sent_tx)); + + request + .extensions_mut() + .insert(RequestStream::new(request_body)); + + Self(request) + } + + /// Sets the current request context for the PHP SAPI. + pub fn set_current(context: Box) { let mut globals = SapiGlobals::get_mut(); globals.server_context = Box::into_raw(context) as *mut c_void; } @@ -50,36 +118,34 @@ impl RequestContext { Some(unsafe { Box::from_raw(ptr as *mut RequestContext) }) } - /// Returns a reference to the request. - pub fn request(&self) -> &Request { - &self.request - } - - /// Returns a mutable reference to the request. - pub fn request_mut(&mut self) -> &mut Request { - &mut self.request - } - - /// Returns a mutable reference to the response builder. - pub fn response_builder_mut(&mut self) -> &mut http_handler::response::Builder { - &mut self.response_builder + /// Signal that headers have been sent with status, mimetype, headers, and logs (streaming mode). + pub fn signal_headers_sent_with_data( + &mut self, + status: u16, + mimetype: String, + headers: Vec<(String, String)>, + logs: Bytes, + ) { + if let Some(headers_sent_tx) = self.extensions_mut().get_mut::() { + if let Ok(mut guard) = headers_sent_tx.0.try_lock() { + if let Some(tx) = guard.take() { + let _ = tx.send((status, mimetype, headers, logs)); + } + } + } } - /// Build the final response using the accumulated data. - pub fn build_response(mut self) -> Result { - // Extract the body buffer from extensions (if any was accumulated) - let body = self - .response_builder - .extensions_mut() - .and_then(|ext| ext.remove::()) - .unwrap_or_default() - .into_bytes_mut(); + /// Shutdown the response stream to signal EOF to response body consumers. + pub fn shutdown_response_stream(&mut self) { + use tokio::io::AsyncWriteExt; - self.response_builder.body(body) - } - - /// Returns the docroot associated with this request context - pub fn docroot(&self) -> PathBuf { - self.docroot.to_owned() + if let Some(response_stream) = self.extensions().get::() { + let mut body = response_stream.0.clone(); + // Spawn shutdown task without waiting - the stream will be closed asynchronously + // This avoids deadlocks when called from spawn_blocking context + crate::sapi::fallback_handle().spawn(async move { + let _ = body.shutdown().await; + }); + } } } diff --git a/src/sapi.rs b/src/sapi.rs index 8ca94a69..4e40ce5d 100644 --- a/src/sapi.rs +++ b/src/sapi.rs @@ -2,9 +2,12 @@ use std::{ collections::HashMap, env::current_exe, ffi::{c_char, c_int, c_void, CStr}, - sync::{Arc, RwLock, Weak}, + sync::{Arc, Mutex, RwLock, Weak}, + thread::ThreadId, }; +use bytes::Buf; + use ext_php_rs::{ alloc::{efree, estrdup}, builders::SapiBuilder, @@ -13,7 +16,7 @@ use ext_php_rs::{ }, // exception::register_error_observer, ffi::{ - php_module_shutdown, php_module_startup, php_register_variable, sapi_send_headers, + php_module_startup, php_register_variable, sapi_headers_struct, sapi_send_headers, sapi_shutdown, sapi_startup, ZEND_RESULT_CODE_SUCCESS, }, prelude::*, @@ -22,13 +25,33 @@ use ext_php_rs::{ use once_cell::sync::OnceCell; -use crate::{EmbedRequestError, EmbedStartError, RequestContext}; -use http_handler::ResponseBuilderExt; +use crate::{extensions::ResponseStream, EmbedRequestError, EmbedStartError, RequestContext}; +use http_handler::extensions::{BodyBuffer, ResponseLog}; +use http_handler::RequestExt; +use once_cell::sync::Lazy; + +// Fallback runtime for sapi callbacks running in blocking context +static FALLBACK_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime") +}); + +pub(crate) fn fallback_handle() -> &'static tokio::runtime::Handle { + FALLBACK_RUNTIME.handle() +} // This is a helper to ensure that PHP is initialized and deinitialized at the // appropriate times. #[derive(Debug)] -pub(crate) struct Sapi(RwLock>); +pub(crate) struct Sapi { + module: RwLock>, + // Track which thread created this Sapi so we only shutdown from that thread + creator_thread: ThreadId, + // Track if shutdown has been called to prevent double-shutdown + shutdown_called: Mutex, +} impl Sapi { pub fn new() -> Result { @@ -44,6 +67,7 @@ impl Sapi { .ub_write_function(sapi_module_ub_write) .flush_function(sapi_module_flush) .send_header_function(sapi_module_send_header) + .send_headers_function(sapi_module_send_headers) .read_post_function(sapi_module_read_post) .read_cookies_function(sapi_module_read_cookies) .register_server_variables_function(sapi_module_register_server_variables) @@ -85,7 +109,11 @@ impl Sapi { // } // }); - Ok(Sapi(RwLock::new(boxed))) + Ok(Sapi { + module: RwLock::new(boxed), + creator_thread: std::thread::current().id(), + shutdown_called: Mutex::new(false), + }) } pub fn startup(&self) -> Result<(), EmbedRequestError> { @@ -94,7 +122,7 @@ impl Sapi { } let sapi = &mut self - .0 + .module .write() .map_err(|_| EmbedRequestError::SapiNotStarted)?; @@ -108,8 +136,26 @@ impl Sapi { } pub fn shutdown(&self) -> Result<(), EmbedRequestError> { + // Only shutdown if we're on the same thread that created this Sapi + let current_thread = std::thread::current().id(); + if current_thread != self.creator_thread { + return Ok(()); + } + + // Prevent double-shutdown + let mut shutdown_called = self + .shutdown_called + .lock() + .map_err(|_| EmbedRequestError::SapiNotShutdown)?; + + if *shutdown_called { + return Ok(()); + } + + *shutdown_called = true; + let sapi = &mut self - .0 + .module .write() .map_err(|_| EmbedRequestError::SapiNotShutdown)?; @@ -125,11 +171,19 @@ impl Sapi { impl Drop for Sapi { fn drop(&mut self) { - self.shutdown().unwrap(); - - unsafe { - sapi_shutdown(); - ext_php_rs_sapi_shutdown(); + // Attempt shutdown, but it will be skipped if we're on the wrong thread + let _ = self.shutdown(); + + // Only call low-level shutdown functions if on the creator thread and shutdown succeeded + if std::thread::current().id() == self.creator_thread { + if let Ok(shutdown_called) = self.shutdown_called.lock() { + if *shutdown_called { + unsafe { + sapi_shutdown(); + ext_php_rs_sapi_shutdown(); + } + } + } } } } @@ -219,9 +273,13 @@ pub extern "C" fn sapi_module_startup( pub extern "C" fn sapi_module_shutdown( _sapi_module: *mut SapiModule, ) -> ext_php_rs::ffi::zend_result { - unsafe { - php_module_shutdown(); + // CRITICAL: Clear server_context BEFORE php_module_shutdown + // to prevent sapi_flush from accessing freed RequestContext + { + let mut globals = SapiGlobals::get_mut(); + globals.server_context = std::ptr::null_mut(); } + ZEND_RESULT_CODE_SUCCESS } @@ -260,16 +318,39 @@ fn maybe_efree(ptr: *mut u8) { #[no_mangle] pub extern "C" fn sapi_module_ub_write(str: *const c_char, str_length: usize) -> usize { + use tokio::io::AsyncWriteExt; + if str.is_null() || str_length == 0 { return 0; } - let bytes = unsafe { std::slice::from_raw_parts(str as *const u8, str_length) }; + // Send headers if not already sent (implicit header send on first output) + unsafe { + let globals = SapiGlobals::get(); + if globals.headers_sent == 0 { + sapi_send_headers(); + } + } + let bytes = unsafe { std::slice::from_raw_parts(str as *const u8, str_length) }; let len = bytes.len(); + if let Some(ctx) = RequestContext::current() { - ctx.response_builder_mut().append_body(bytes); + // Get ResponseStream from extensions + if let Some(response_stream) = ctx.extensions().get::() { + // Clone body to avoid holding ctx reference + let mut body = response_stream.0.clone(); + + // Use block_on to write asynchronously + let result = fallback_handle().block_on(async { body.write_all(bytes).await }); + + match result { + Ok(_) => return len, + Err(_) => return 0, // Write error + } + } } + len } @@ -279,57 +360,173 @@ pub extern "C" fn sapi_module_flush(_server_context: *mut c_void) { } #[no_mangle] -pub extern "C" fn sapi_module_send_header(header: *mut SapiHeader, _server_context: *mut c_void) { - // Not sure _why_ this is necessary, but it is. - if header.is_null() { - return; - } +pub extern "C" fn sapi_module_send_header(_header: *mut SapiHeader, _server_context: *mut c_void) { + // This is called by PHP for each header, but we don't need to track them here. + // PHP maintains its own list in sapi_headers_struct, which we read in + // sapi_module_send_headers() when headers are finalized. +} - let header = unsafe { &*header }; - let name = header.name(); - - // Header value is None for http version + status line - if let Some(value) = header.value() { - if let Some(ctx) = RequestContext::current() { - let builder = std::mem::replace( - ctx.response_builder_mut(), - http_handler::response::Builder::new(), - ); - let builder = builder.header(name, value); - *ctx.response_builder_mut() = builder; +#[no_mangle] +pub extern "C" fn sapi_module_send_headers(sapi_headers: *mut sapi_headers_struct) -> c_int { + use ext_php_rs::ffi::sapi_get_default_content_type; + use ext_php_rs::zend::SapiHeader; + + // Extract status and mimetype as owned types BEFORE leaving PHP thread + if let Some(ctx) = RequestContext::current() { + let (status, mimetype_owned) = { + let h = SapiGlobals::get().sapi_headers; + let mut mime = h.mimetype; + if mime.is_null() { + mime = unsafe { sapi_get_default_content_type() }; + } + + let mime_str = if !mime.is_null() { + unsafe { std::ffi::CStr::from_ptr(mime) } + .to_str() + .unwrap_or("text/html") + .to_owned() + } else { + "text/html".to_owned() + }; + + // Free the mimetype if it was allocated + if !mime.is_null() && mime != h.mimetype { + unsafe { efree(mime.cast::()) }; + } + + (h.http_response_code as u16, mime_str) + }; + + // Extract headers from sapi_headers_struct + let mut custom_headers = Vec::new(); + if !sapi_headers.is_null() { + let headers_list = unsafe { &(*sapi_headers).headers }; + + for header in headers_list.iter::() { + let name = header.name(); + if let Some(value) = header.value() { + custom_headers.push((name.to_string(), value.to_string())); + } + } } + + // Collect logs from ResponseLog extension + let logs = if let Some(log_ext) = ctx.extensions().get::() { + bytes::Bytes::copy_from_slice(log_ext.as_bytes()) + } else { + bytes::Bytes::new() + }; + + // Signal headers sent with owned data including custom headers and logs + ctx.signal_headers_sent_with_data(status, mimetype_owned.clone(), custom_headers, logs); } + + 1 // SAPI_HEADER_SENT_SUCCESSFULLY } #[no_mangle] pub extern "C" fn sapi_module_read_post(buffer: *mut c_char, length: usize) -> usize { + use tokio::io::AsyncReadExt; + if length == 0 { return 0; } - RequestContext::current() - .map(|ctx| { - let body = ctx.request_mut().body_mut(); - let actual_length = length.min(body.len()); - if actual_length == 0 { - return 0; + let result = RequestContext::current() + .and_then(|ctx| { + // Get or create BodyBuffer extension + let buffer_len = ctx + .extensions() + .get::() + .map(|b| b.len()) + .unwrap_or(0); + + // If buffer is empty and we have a request stream, wait for data + if buffer_len == 0 { + // Check if we have a request stream + if let Some(request_stream) = ctx.extensions().get::() { + // Clone body to avoid holding ctx reference + let mut body = request_stream.0.clone(); + + // Keep reading chunks until we have enough data or hit EOF + loop { + // Check current buffer size + let current_buffer_len = ctx + .extensions() + .get::() + .map(|b| b.len()) + .unwrap_or(0); + + // If we have enough data, stop reading + if current_buffer_len >= length { + break; + } + + // Read a chunk from the stream using block_on + let read_result = fallback_handle().block_on(async { + let mut chunk = vec![0u8; 8192]; + match body.read(&mut chunk).await { + Ok(0) => None, // EOF + Ok(n) => { + chunk.truncate(n); + Some(bytes::Bytes::from(chunk)) + } + Err(_) => None, // Error, treat as EOF + } + }); + + // Append data if we got some + if let Some(data) = read_result { + if !data.is_empty() { + // Get or insert BodyBuffer and append data + if let Some(body_buf) = ctx.extensions_mut().get_mut::() { + body_buf.append(&data); + } else { + ctx.extensions_mut().insert(BodyBuffer::from_bytes(data)); + } + } + } else { + // EOF reached + break; + } + } + } } - // Properly consume from the original body buffer - let chunk = body.split_to(actual_length); + // Now read from the buffer + // We need to consume data from the buffer, so we'll convert to BytesMut, split, then convert back + ctx + .extensions_mut() + .get_mut::() + .and_then(|body| { + let actual_length = length.min(body.len()); + if actual_length == 0 { + return None; + } - unsafe { - std::ptr::copy_nonoverlapping(chunk.as_ptr() as *mut c_char, buffer, actual_length); - } - actual_length + // Get the bytes as a slice and copy to output buffer + let bytes = body.as_bytes(); + unsafe { + std::ptr::copy_nonoverlapping(bytes.as_ptr() as *const c_char, buffer, actual_length); + } + + // Now we need to remove the consumed bytes - convert to BytesMut, split, convert back + let mut bytes_mut = std::mem::replace(body, BodyBuffer::new()).into_bytes_mut(); + bytes_mut.advance(actual_length); + *body = BodyBuffer::from_bytes(bytes_mut.freeze()); + + Some(actual_length) + }) }) - .unwrap_or(0) + .unwrap_or(0); + + result } #[no_mangle] pub extern "C" fn sapi_module_read_cookies() -> *mut c_char { RequestContext::current() - .map(|ctx| match ctx.request().headers().get("Cookie") { + .map(|ctx| match ctx.headers().get("Cookie") { Some(cookie) => estrdup(cookie.to_str().unwrap_or("")), None => std::ptr::null_mut(), }) @@ -377,9 +574,8 @@ pub extern "C" fn sapi_module_register_server_variables(vars: *mut ext_php_rs::t // } if let Some(ctx) = RequestContext::current() { - let request = ctx.request(); - let headers = request.headers(); - let uri = request.uri(); + let headers = ctx.headers(); + let uri = ctx.uri(); // Hack to allow ? syntax for the following code. // At the moment any errors are just swallowed, but these could be @@ -396,8 +592,11 @@ pub extern "C" fn sapi_module_register_server_variables(vars: *mut ext_php_rs::t let globals = SapiGlobals::get(); let req_info = &globals.request_info; - let docroot = ctx.docroot(); - let docroot_str = docroot.display().to_string(); + // Get docroot from DocumentRoot extension + let docroot_str = ctx + .document_root() + .map(|dr| dr.path.display().to_string()) + .unwrap_or_else(|| ".".to_string()); let script_filename = req_info.path_translated; let script_name = if !req_info.request_uri.is_null() { @@ -437,11 +636,11 @@ pub extern "C" fn sapi_module_register_server_variables(vars: *mut ext_php_rs::t env_var(vars, "SERVER_PROTOCOL", "HTTP/1.1")?; let sapi = get_sapi()?; - if let Ok(inner_sapi) = sapi.0.read() { + if let Ok(inner_sapi) = sapi.module.read() { env_var_c(vars, "SERVER_SOFTWARE", inner_sapi.name)?; } - if let Some(socket_info) = request.extensions().get::() { + if let Some(socket_info) = ctx.extensions().get::() { if let Some(local) = socket_info.local { env_var(vars, "SERVER_ADDR", local.ip().to_string())?; env_var(vars, "SERVER_PORT", local.port().to_string())?; @@ -474,7 +673,11 @@ pub extern "C" fn sapi_module_register_server_variables(vars: *mut ext_php_rs::t pub extern "C" fn sapi_module_log_message(message: *const c_char, _syslog_type_int: c_int) { let message = unsafe { CStr::from_ptr(message) }; if let Some(ctx) = RequestContext::current() { - ctx.response_builder_mut().append_log(message.to_bytes()); + // Append to ResponseLog extension for both streaming and buffered modes + // Note: PHP's SAPI adds the newline, so we just append the message + if let Some(log_ext) = ctx.extensions_mut().get_mut::() { + log_ext.append(message.to_bytes()); + } } } @@ -486,11 +689,9 @@ pub extern "C" fn sapi_module_log_message(message: *const c_char, _syslog_type_i pub fn apache_request_headers() -> Result, String> { let mut headers = HashMap::new(); - let request = RequestContext::current() - .map(|ctx| ctx.request()) - .ok_or("Request context unavailable")?; + let ctx = RequestContext::current().ok_or("Request context unavailable")?; - for (key, value) in request.headers().iter() { + for (key, value) in ctx.headers().iter() { headers.insert(key.to_string(), value.to_str().unwrap_or("").to_string()); }