diff --git a/.gitignore b/.gitignore index 7c4f5ab..a34ded4 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ .phpunit.result.cache .phpunit.cache .env -/coverage \ No newline at end of file +/coverage +.DS_Store diff --git a/README.md b/README.md index ffb63f0..9e870a2 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,149 @@ If you're using separate services for dispatching and handling tasks, and your a 'disable_task_handler' => env('CLOUD_TASKS_DISABLE_TASK_HANDLER', false), ``` +### Cloud Run Jobs + +If you want jobs to be processed by Cloud Run Jobs instead of HTTP endpoints, you can configure the queue to trigger Cloud Run Job executions. + +#### Why Cloud Run Jobs? + +Cloud Run Jobs are ideal for long-running batch processing that exceeds Cloud Tasks HTTP timeout limits. + +Cloud Run Jobs can run for up to 7 days. + +**Tip**: Use seperate queue connections with different targets, for low latency jobs, use HTTP targets, for longer running batch jobs use Cloud Run Jobs. + +#### Setup + +1. **Create a Cloud Run Job** with your Laravel application container, configured to run: + +```bash +php artisan cloud-tasks:work-job +``` + +The command reads job data from environment variables passed to the Job by Cloud Run. + +2. **Configure your queue connection**: + +```php +'cloudtasks' => [ + 'driver' => 'cloudtasks', + 'project' => env('CLOUD_TASKS_PROJECT'), + 'location' => env('CLOUD_TASKS_LOCATION'), + 'queue' => env('CLOUD_TASKS_QUEUE', 'default'), + + // Cloud Run Job configuration + 'cloud_run_job' => env('CLOUD_TASKS_USE_CLOUD_RUN_JOB', false), + 'cloud_run_job_name' => env('CLOUD_RUN_JOB_NAME'), + 'cloud_run_job_region' => env('CLOUD_RUN_JOB_REGION'), // defaults to location + 'service_account_email' => env('CLOUD_TASKS_SERVICE_EMAIL'), + + // Optional: Store large payloads (>10KB) in filesystem + 'payload_disk' => env('CLOUD_TASKS_PAYLOAD_DISK'), // Laravel disk name + 'payload_prefix' => env('CLOUD_TASKS_PAYLOAD_PREFIX', 'cloud-tasks-payloads'), + 'payload_threshold' => env('CLOUD_TASKS_PAYLOAD_THRESHOLD', 10240), // bytes +], +``` + +> **Note**: The command reads `CLOUD_TASKS_PAYLOAD`, `CLOUD_TASKS_TASK_NAME`, and `CLOUD_TASKS_PAYLOAD_PATH` directly from environment variables at runtime using `getenv()`. These are set automatically by Cloud Tasks via container overrides. + +3. **Set environment variables**: + +```dotenv +CLOUD_TASKS_USE_CLOUD_RUN_JOB=true +CLOUD_RUN_JOB_NAME=my-queue-worker-job +CLOUD_RUN_JOB_REGION=europe-west1 +``` + +#### Large Payload Storage + +For jobs with payloads exceeding environment variable limits (32KB limit enforced by Cloud Run), configure a Laravel filesystem disk: + +```dotenv +CLOUD_TASKS_PAYLOAD_DISK=gcs +CLOUD_TASKS_PAYLOAD_PREFIX=cloud-tasks-payloads +CLOUD_TASKS_PAYLOAD_THRESHOLD=30000 +``` + +When the payload exceeds the threshold, it's stored in the disk and `CLOUD_TASKS_PAYLOAD_PATH` is used instead. + +> **Note**: The payloads will not be cleared up automatically, you can define lifecycle rules for the GCS bucket to delete old payloads. + +#### How It Works + +When you dispatch a job with Cloud Run Job target enabled: + +1. Package creates a Cloud Task with HTTP target pointing to Cloud Run Jobs API +2. Cloud Tasks calls `run.googleapis.com/v2/.../jobs/{job}:run` +3. Cloud Run Jobs starts a new execution with environment variables set via container overrides: + - `CLOUD_TASKS_PAYLOAD` - Base64-encoded job payload + - `CLOUD_TASKS_TASK_NAME` - The task name +4. The container runs `php artisan cloud-tasks:work-job` which reads the env vars and processes the job + +All Laravel queue functionality is retained: +- Job retries and max attempts +- Failed job handling +- Job timeouts +- Encrypted jobs +- Queue events + +#### Required IAM Permissions + +Cloud Run Jobs requires specific IAM permissions. Set these variables first: + +```bash +export PROJECT_ID="your-project-id" +export SA_EMAIL="your-service-account@your-project-id.iam.gserviceaccount.com" +export TASKS_AGENT="service-{PROJECT_NUMBER}@gcp-sa-cloudtasks.iam.gserviceaccount.com" +``` + +> **Note**: Find your Cloud Tasks service agent email in the IAM console under "Include Google-provided role grants". +> **Note**: Project ID and Project Number are different. Project ID is the name of your project, Project Number is the numeric ID of your project. + +**Project-Level Permissions:** + +```bash +# Allow enqueuing tasks (required by PHP app running as $SA_EMAIL) +gcloud projects add-iam-policy-binding $PROJECT_ID \ + --member="serviceAccount:$SA_EMAIL" \ + --role="roles/cloudtasks.enqueuer" + +# Allow executing jobs with overrides (required for container overrides) +gcloud projects add-iam-policy-binding $PROJECT_ID \ + --member="serviceAccount:$SA_EMAIL" \ + --role="roles/run.jobsExecutorWithOverrides" + +# Allow invoking Cloud Run Services (if also using Cloud Run Services as HTTP targets) +gcloud projects add-iam-policy-binding $PROJECT_ID \ + --member="serviceAccount:$SA_EMAIL" \ + --role="roles/run.invoker" +``` + +**Note**: To restrict access to specific Cloud Run instances, use IAM conditions to limit access to specific Cloud Run Jobs / services. + +**Service Account Permissions:** + +```bash +# Allow the SA to act as itself (required for task creation and execution) +gcloud iam service-accounts add-iam-policy-binding $SA_EMAIL \ + --member="serviceAccount:$SA_EMAIL" \ + --role="roles/iam.serviceAccountUser" + +# Allow Cloud Tasks to act as the SA (required for OAuth token generation) +gcloud iam service-accounts add-iam-policy-binding $SA_EMAIL \ + --member="serviceAccount:$TASKS_AGENT" \ + --role="roles/iam.serviceAccountUser" +``` + +| Permission | Required By | Purpose | +|------------|-------------|---------| +| `cloudtasks.enqueuer` | PHP App | Add tasks to the queue | +| `cloudtasks.viewer` | Cloud Run Job | List queues/tasks (optional) | +| `run.jobsExecutorWithOverrides` | Cloud Task | Execute jobs with container overrides | +| `run.invoker` | Other Workloads | Invoke Cloud Run Services (if using HTTP targets) | +| `iam.serviceAccountUser` (on SA) | Both | Allow SA to create tasks as itself | +| `iam.serviceAccountUser` (Tasks Agent) | Google Infrastructure | Generate OAuth tokens for Cloud Run | + ### How-To #### Pass headers to a task diff --git a/src/CloudTasksConnector.php b/src/CloudTasksConnector.php index f939170..862826e 100644 --- a/src/CloudTasksConnector.php +++ b/src/CloudTasksConnector.php @@ -19,7 +19,13 @@ * service_account_email?: string, * backoff?: int, * dispatch_deadline?: int, - * after_commit?: bool + * after_commit?: bool, + * cloud_run_job?: bool, + * cloud_run_job_name?: string, + * cloud_run_job_region?: string, + * payload_disk?: string, + * payload_prefix?: string, + * payload_threshold?: int * } */ class CloudTasksConnector implements ConnectorInterface diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index be377e8..b23780b 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -17,7 +17,9 @@ use Illuminate\Queue\WorkerOptions; use Google\Cloud\Tasks\V2\OidcToken; use Google\Cloud\Tasks\V2\HttpMethod; +use Google\Cloud\Tasks\V2\OAuthToken; use Google\Cloud\Tasks\V2\HttpRequest; +use Illuminate\Support\Facades\Storage; use Google\Cloud\Tasks\V2\AppEngineRouting; use Illuminate\Queue\Queue as LaravelQueue; use Google\Cloud\Tasks\V2\AppEngineHttpRequest; @@ -278,6 +280,45 @@ public function addPayloadToTask(array $payload, Task $task, $job): Task } $task->setAppEngineHttpRequest($appEngineRequest); + } elseif (! empty($this->config['cloud_run_job'])) { + // Cloud Run Job target - call the Cloud Run Jobs execution API + $httpRequest = new HttpRequest; + $httpRequest->setUrl($this->getCloudRunJobExecutionUrl()); + $httpRequest->setHttpMethod(HttpMethod::POST); + $httpRequest->setHeaders(array_merge($headers, [ + 'Content-Type' => 'application/json', + ])); + + // Build the execution request body with container overrides + // The job payload is passed as environment variables + $taskNameShort = str($task->getName())->afterLast('/')->toString(); + $encodedPayload = base64_encode(json_encode($payload)); + + // Build env vars for the container using fixed env var names + // These map to config keys: cloud_run_job_payload, cloud_run_job_task_name, cloud_run_job_payload_path + $envVars = $this->getCloudRunJobEnvVars($encodedPayload, $taskNameShort); + + $executionBody = [ + 'overrides' => [ + 'containerOverrides' => [ + [ + 'env' => $envVars, + ], + ], + ], + ]; + + $httpRequest->setBody(json_encode($executionBody)); + + $token = new OAuthToken; + $token->setServiceAccountEmail($this->config['service_account_email'] ?? ''); + $token->setScope('https://www.googleapis.com/auth/cloud-platform'); + $httpRequest->setOAuthToken($token); + $task->setHttpRequest($httpRequest); + + if (! empty($this->config['dispatch_deadline'])) { + $task->setDispatchDeadline((new Duration)->setSeconds($this->config['dispatch_deadline'])); + } } else { $httpRequest = new HttpRequest; $httpRequest->setUrl($this->getHandler($job)); @@ -367,4 +408,63 @@ private function getQueueForJob(mixed $job): string return $this->config['queue']; } + + /** + * Get the Cloud Run Jobs execution API URL. + */ + private function getCloudRunJobExecutionUrl(): string + { + $project = $this->config['project']; + $region = $this->config['cloud_run_job_region'] ?? $this->config['location']; + $jobName = $this->config['cloud_run_job_name'] ?? throw new Exception('cloud_run_job_name is required when using Cloud Run Jobs.'); + + return sprintf( + 'https://run.googleapis.com/v2/projects/%s/locations/%s/jobs/%s:run', + $project, + $region, + $jobName + ); + } + + /** + * Get the environment variables for Cloud Run Job dispatch. + * + * If the payload exceeds the configured threshold, it will be stored + * in the configured disk and the path will be returned instead. + * + * Env vars set map to config keys in the queue connection: + * - CLOUD_TASKS_TASK_NAME -> cloud_run_job_task_name + * - CLOUD_TASKS_PAYLOAD -> cloud_run_job_payload + * - CLOUD_TASKS_PAYLOAD_PATH -> cloud_run_job_payload_path + * + * @return array + */ + private function getCloudRunJobEnvVars(string $encodedPayload, string $taskName): array + { + $disk = $this->config['payload_disk'] ?? null; + $threshold = $this->config['payload_threshold'] ?? 10240; // 10KB default + + $envVars = [ + ['name' => 'CLOUD_TASKS_TASK_NAME', 'value' => $taskName], + ]; + + // If no disk configured or payload is below threshold, pass payload directly + if ($disk === null || strlen($encodedPayload) <= $threshold) { + $envVars[] = ['name' => 'CLOUD_TASKS_PAYLOAD', 'value' => $encodedPayload]; + + return $envVars; + } + + // Store payload in configured disk and pass path instead + $prefix = $this->config['payload_prefix'] ?? 'cloud-tasks-payloads'; + $timestamp = now()->format('Y-m-d_H:i:s.v'); + $path = sprintf('%s/%s_%s.json', $prefix, $timestamp, $taskName); + + Storage::disk($disk)->put($path, $encodedPayload); + + // Set the path env var for large payloads + $envVars[] = ['name' => 'CLOUD_TASKS_PAYLOAD_PATH', 'value' => $disk.':'.$path]; + + return $envVars; + } } diff --git a/src/CloudTasksServiceProvider.php b/src/CloudTasksServiceProvider.php index 1301f57..8e4833a 100644 --- a/src/CloudTasksServiceProvider.php +++ b/src/CloudTasksServiceProvider.php @@ -14,6 +14,7 @@ use Google\Cloud\Tasks\V2\Client\CloudTasksClient; use Stackkit\LaravelGoogleCloudTasksQueue\Events\JobReleased; use Illuminate\Support\ServiceProvider as LaravelServiceProvider; +use Stackkit\LaravelGoogleCloudTasksQueue\Commands\WorkCloudRunJob; class CloudTasksServiceProvider extends LaravelServiceProvider { @@ -24,6 +25,7 @@ public function boot(): void $this->registerConfig(); $this->registerRoutes(); $this->registerEvents(); + $this->registerCommands(); } private function registerClient(): void @@ -112,4 +114,13 @@ private function registerEvents(): void } }); } + + private function registerCommands(): void + { + if ($this->app->runningInConsole()) { + $this->commands([ + WorkCloudRunJob::class, + ]); + } + } } diff --git a/src/Commands/WorkCloudRunJob.php b/src/Commands/WorkCloudRunJob.php new file mode 100644 index 0000000..a882c5a --- /dev/null +++ b/src/Commands/WorkCloudRunJob.php @@ -0,0 +1,201 @@ +getEnvValue('CLOUD_TASKS_TASK_NAME'); + if ($taskName === null) { + $this->error('Required environment variable CLOUD_TASKS_TASK_NAME is not set.'); + + return self::FAILURE; + } + + // Get payload from environment - either direct or from path + $payload = $this->getPayload(); + if ($payload === null) { + $this->error('Required environment variable CLOUD_TASKS_PAYLOAD or CLOUD_TASKS_PAYLOAD_PATH is not set.'); + + return self::FAILURE; + } + + try { + $decodedPayload = base64_decode($payload); + $task = IncomingTask::fromJson($decodedPayload, $taskName); + } catch (Exception $e) { + $this->error('Failed to decode payload: '.$e->getMessage()); + + return self::FAILURE; + } + + // Get connection from the payload + $connectionName = $task->connection(); + + /** @var QueueConfig $config */ + $config = config('queue.connections.'.$connectionName); + + $client = app(CloudTasksClient::class); + $queue = tap(new CloudTasksQueue($config, $client))->setConnectionName($connectionName); + + $job = new CloudTasksJob( + container: Container::getInstance(), + driver: $queue, + job: $task->toArray(), + connectionName: $connectionName, + queue: $task->queue(), + ); + + $job->setAttempts($job->attempts() + 1); + + /** @var Worker $worker */ + $worker = app('cloud-tasks.worker'); + + // Use rescue to catch any errors during processing, similar to TaskHandler + $failed = false; + rescue(function () use ($worker, $job, $task, $config) { + $worker->process( + connectionName: $job->getConnectionName(), + job: $job, + options: CloudTasksQueue::getWorkerOptionsCallback() + ? (CloudTasksQueue::getWorkerOptionsCallback())($task) + : $this->getWorkerOptions($config) + ); + }, function () use (&$failed) { + $failed = true; + }); + + if ($failed || $job->hasFailed()) { + $this->error('Job processing failed.'); + + return self::FAILURE; + } + + $this->info('Job processed successfully.'); + + return self::SUCCESS; + } + + /** + * Get the payload from environment variable or storage. + */ + private function getPayload(): ?string + { + // First check for direct payload + $payload = $this->getEnvValue('CLOUD_TASKS_PAYLOAD'); + if ($payload !== null) { + return $payload; + } + + // Check for payload path (for large payloads stored in filesystem) + $payloadPath = $this->getEnvValue('CLOUD_TASKS_PAYLOAD_PATH'); + if ($payloadPath !== null) { + return $this->fetchPayloadFromStorage($payloadPath); + } + + return null; + } + + /** + * Get an environment variable value. + */ + private function getEnvValue(string $name): ?string + { + $value = getenv($name); + + return $value !== false && $value !== '' ? $value : null; + } + + /** + * Fetch payload from Laravel filesystem storage and clean up. + */ + private function fetchPayloadFromStorage(string $payloadPath): ?string + { + // Parse format: disk:path + if (! str_contains($payloadPath, ':')) { + $this->error('Invalid payload path format. Expected: disk:path'); + + return null; + } + + [$disk, $path] = explode(':', $payloadPath, 2); + + if (! Storage::disk($disk)->exists($path)) { + $this->error("Payload file not found: {$payloadPath}"); + + return null; + } + + $payload = Storage::disk($disk)->get($path); + + // Clean up the file after reading + Storage::disk($disk)->delete($path); + + return $payload; + } + + /** + * Get the worker options for the job. + * + * @param QueueConfig $config + */ + private function getWorkerOptions(array $config): WorkerOptions + { + $options = new WorkerOptions; + + if (isset($config['backoff'])) { + $options->backoff = $config['backoff']; + } + + return $options; + } +} diff --git a/src/IncomingTask.php b/src/IncomingTask.php index a4e675d..9135626 100644 --- a/src/IncomingTask.php +++ b/src/IncomingTask.php @@ -27,12 +27,14 @@ class IncomingTask /** * @param JobShape $task */ - private function __construct(private readonly array $task) - { + private function __construct( + private readonly array $task, + private readonly ?string $taskName = null + ) { // } - public static function fromJson(string $payload): self + public static function fromJson(string $payload, ?string $taskName = null): self { try { $decode = json_decode($payload, true); @@ -42,7 +44,7 @@ public static function fromJson(string $payload): self } /** @var JobShape $decode */ - return new self($decode); + return new self($decode, $taskName); } catch (JsonException) { throw new Exception('Invalid task payload.'); } @@ -66,6 +68,12 @@ public function queue(): string public function shortTaskName(): string { + // When running via CLI (Cloud Run Job), use the task name passed to constructor + if ($this->taskName !== null) { + return $this->taskName; + } + + // When running via HTTP, extract from headers return request()->header('X-CloudTasks-TaskName') ?? request()->header('X-AppEngine-TaskName') ?? throw new Error('Unable to extract taskname from header'); diff --git a/tests/CloudRunJobTest.php b/tests/CloudRunJobTest.php new file mode 100644 index 0000000..db09af5 --- /dev/null +++ b/tests/CloudRunJobTest.php @@ -0,0 +1,530 @@ +task->getHttpRequest() ?? $event->task->getAppEngineHttpRequest(); + $payload = $request->getBody(); + }); + + dispatch($job); + + return base64_encode((string) $payload); + } + + /** + * Set environment variables for testing the command. + */ + private function setEnvVars(string $payload, string $taskName): void + { + putenv('CLOUD_TASKS_PAYLOAD='.$payload); + putenv('CLOUD_TASKS_TASK_NAME='.$taskName); + } + + // ======================================== + // Command Execution Tests + // ======================================== + + #[Test] + public function it_can_run_a_job_via_the_command(): void + { + // Arrange + Event::fake(JobOutput::class); + $payload = $this->createPayload(new SimpleJob); + $this->setEnvVars($payload, 'test-task-name'); + + // Act + $this->artisan('cloud-tasks:work-job')->assertSuccessful(); + + // Assert + Event::assertDispatched(fn (JobOutput $event) => $event->output === 'SimpleJob:success'); + } + + #[Test] + public function it_extracts_connection_from_payload(): void + { + // Arrange + Event::fake(JobOutput::class); + + // Create a job with a specific connection + $job = new SimpleJob; + $job->connection = 'my-cloudtasks-connection'; + $payload = $this->createPayload($job); + $this->setEnvVars($payload, 'test-task-name'); + + // Act - no --connection needed, it extracts from payload + $this->artisan('cloud-tasks:work-job')->assertSuccessful(); + + // Assert + Event::assertDispatched(fn (JobOutput $event) => $event->output === 'SimpleJob:success'); + } + + #[Test] + public function it_fails_without_payload(): void + { + // Arrange + putenv('CLOUD_TASKS_TASK_NAME=test-task-name'); + + // Act & Assert + $this->artisan('cloud-tasks:work-job')->assertFailed(); + } + + #[Test] + public function it_fails_without_task_name(): void + { + // Arrange + $payload = $this->createPayload(new SimpleJob); + putenv('CLOUD_TASKS_PAYLOAD='.$payload); + + // Act & Assert + $this->artisan('cloud-tasks:work-job')->assertFailed(); + } + + #[Test] + public function it_fails_with_invalid_payload(): void + { + // Arrange + $this->setEnvVars('not-valid-base64!!!', 'test-task-name'); + + // Act & Assert + $this->artisan('cloud-tasks:work-job')->assertFailed(); + } + + #[Test] + public function it_handles_failing_jobs(): void + { + // Arrange + Event::fake(JobOutput::class); + $payload = $this->createPayload(new FailingJob); + $this->setEnvVars($payload, 'test-task-name'); + + // Act + $this->artisan('cloud-tasks:work-job'); + + // Assert - The job should process but the command may return failure due to exception + Event::assertDispatched(JobOutput::class); + } + + #[Test] + public function it_can_handle_encrypted_jobs(): void + { + // Arrange + Event::fake(JobOutput::class); + $payload = $this->createPayload(new EncryptedJob); + $this->setEnvVars($payload, 'test-task-name'); + + // Act + $this->artisan('cloud-tasks:work-job')->assertSuccessful(); + + // Assert + Event::assertDispatched(fn (JobOutput $event) => $event->output === 'EncryptedJob:success'); + } + + #[Test] + public function uses_worker_options_callback(): void + { + // Arrange + Event::fake(JobOutput::class); + CloudTasksQueue::configureWorkerOptionsUsing(function (IncomingTask $task) { + return new WorkerOptions(maxTries: 10); + }); + + $payload = $this->createPayload(new SimpleJob); + $this->setEnvVars($payload, 'test-task-name'); + + // Act + $this->artisan('cloud-tasks:work-job')->assertSuccessful(); + + // Assert + Event::assertDispatched(fn (JobOutput $event) => $event->output === 'SimpleJob:success'); + } + + // ======================================== + // Cloud Run Job Dispatch Tests + // ======================================== + + #[Test] + public function cloud_run_job_target_creates_http_request_to_run_api(): void + { + // Arrange + CloudTasksApi::fake(); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + $this->setConfigValue('cloud_run_job_region', 'europe-west1'); + + // Act + $this->dispatch(new SimpleJob); + + // Assert + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + $url = $task->getHttpRequest()->getUrl(); + + return $url === 'https://run.googleapis.com/v2/projects/my-test-project/locations/europe-west1/jobs/my-worker-job:run'; + }); + } + + #[Test] + public function cloud_run_job_target_uses_location_as_default_region(): void + { + // Arrange + CloudTasksApi::fake(); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + // Not setting cloud_run_job_region - should default to location + + // Act + $this->dispatch(new SimpleJob); + + // Assert + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + $url = $task->getHttpRequest()->getUrl(); + + // Should use 'europe-west6' from location config + return str_contains($url, 'europe-west6'); + }); + } + + #[Test] + public function cloud_run_job_target_posts_with_post_method(): void + { + // Arrange + CloudTasksApi::fake(); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + + // Act + $this->dispatch(new SimpleJob); + + // Assert + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + return $task->getHttpRequest()->getHttpMethod() === HttpMethod::POST; + }); + } + + #[Test] + public function cloud_run_job_target_includes_container_overrides_with_env_vars(): void + { + // Arrange + CloudTasksApi::fake(); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + + // Act + $this->dispatch(new SimpleJob); + + // Assert + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + $body = json_decode($task->getHttpRequest()->getBody(), true); + + // Check that overrides with containerOverrides.env exists + return isset($body['overrides']['containerOverrides'][0]['env']); + }); + } + + #[Test] + public function cloud_run_job_target_includes_base64_encoded_payload_in_env(): void + { + // Arrange + CloudTasksApi::fake(); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + + // Act + $this->dispatch(new SimpleJob); + + // Assert + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + $body = json_decode($task->getHttpRequest()->getBody(), true); + $envVars = $body['overrides']['containerOverrides'][0]['env'] ?? []; + + // Find the payload env var + foreach ($envVars as $env) { + if ($env['name'] === 'CLOUD_TASKS_PAYLOAD') { + $decoded = base64_decode($env['value']); + + return $decoded !== false && json_decode($decoded, true) !== null; + } + } + + return false; + }); + } + + #[Test] + public function cloud_run_job_target_includes_task_name_in_env(): void + { + // Arrange + CloudTasksApi::fake(); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + + // Act + $this->dispatch(new SimpleJob); + + // Assert + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + $body = json_decode($task->getHttpRequest()->getBody(), true); + $envVars = $body['overrides']['containerOverrides'][0]['env'] ?? []; + + // Find the task name env var + foreach ($envVars as $env) { + if ($env['name'] === 'CLOUD_TASKS_TASK_NAME') { + return ! empty($env['value']); + } + } + + return false; + }); + } + + #[Test] + public function cloud_run_job_target_sets_oauth_token_with_correct_scope(): void + { + // Arrange + CloudTasksApi::fake(); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + + // Act + $this->dispatch(new SimpleJob); + + // Assert + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + $oauthToken = $task->getHttpRequest()->getOAuthToken(); + + return $oauthToken->getScope() === 'https://www.googleapis.com/auth/cloud-platform'; + }); + } + + #[Test] + public function cloud_run_job_target_respects_dispatch_deadline(): void + { + // Arrange + CloudTasksApi::fake(); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + $this->setConfigValue('dispatch_deadline', 1800); + + // Act + $this->dispatch(new SimpleJob); + + // Assert + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + return $task->getDispatchDeadline()->getSeconds() === 1800; + }); + } + + // ======================================== + // IncomingTask Tests for CLI Context + // ======================================== + + #[Test] + public function incoming_task_returns_task_name_from_constructor(): void + { + // Arrange + $payload = json_encode([ + 'displayName' => 'SimpleJob', + 'job' => 'Illuminate\\Queue\\CallQueuedHandler@call', + 'data' => [ + 'command' => serialize(new SimpleJob), + ], + 'internal' => [ + 'attempts' => 0, + ], + ]); + + // Act + $task = IncomingTask::fromJson($payload, 'my-custom-task-name'); + + // Assert + $this->assertEquals('my-custom-task-name', $task->shortTaskName()); + } + + #[Test] + public function incoming_task_extracts_connection_from_payload(): void + { + // Arrange + $job = new SimpleJob; + $job->connection = 'my-cloudtasks-connection'; + + $payload = json_encode([ + 'displayName' => 'SimpleJob', + 'job' => 'Illuminate\\Queue\\CallQueuedHandler@call', + 'data' => [ + 'command' => serialize($job), + ], + 'internal' => [ + 'attempts' => 0, + ], + ]); + + // Act + $task = IncomingTask::fromJson($payload, 'test-task'); + + // Assert + $this->assertEquals('my-cloudtasks-connection', $task->connection()); + } + + // ======================================== + // GCS Payload Storage Tests + // ======================================== + + #[Test] + public function payload_below_threshold_is_passed_directly_in_env(): void + { + // Arrange + CloudTasksApi::fake(); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + $this->setConfigValue('payload_disk', 'local'); + $this->setConfigValue('payload_threshold', 100000); // 100KB threshold + + // Act + $this->dispatch(new SimpleJob); + + // Assert - should use CLOUD_TASKS_PAYLOAD directly since payload is below threshold + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + $body = json_decode($task->getHttpRequest()->getBody(), true); + $envVars = $body['overrides']['containerOverrides'][0]['env'] ?? []; + + foreach ($envVars as $env) { + if ($env['name'] === 'CLOUD_TASKS_PAYLOAD') { + return true; + } + } + + return false; + }); + } + + #[Test] + public function payload_above_threshold_is_stored_in_disk(): void + { + // Arrange + CloudTasksApi::fake(); + Storage::fake('local'); + $this->setConfigValue('cloud_run_job', true); + $this->setConfigValue('cloud_run_job_name', 'my-worker-job'); + $this->setConfigValue('payload_disk', 'local'); + $this->setConfigValue('payload_prefix', 'payloads'); + $this->setConfigValue('payload_threshold', 1); // 1 byte threshold + + // Act + $this->dispatch(new SimpleJob); + + // Assert - should use CLOUD_TASKS_PAYLOAD_PATH since payload exceeds threshold + CloudTasksApi::assertTaskCreated(function (Task $task): bool { + $body = json_decode($task->getHttpRequest()->getBody(), true); + $envVars = $body['overrides']['containerOverrides'][0]['env'] ?? []; + + foreach ($envVars as $env) { + if ($env['name'] === 'CLOUD_TASKS_PAYLOAD_PATH') { + return true; + } + } + + return false; + }); + + // Assert file was created in payloads directory + $files = Storage::disk('local')->files('payloads'); + $this->assertNotEmpty($files, 'Payload file was not created in storage'); + } + + #[Test] + public function worker_can_process_job_from_payload_path(): void + { + // Arrange + Event::fake(JobOutput::class); + Storage::fake('local'); + $payload = $this->createPayload(new SimpleJob); + $path = 'cloud-tasks-payloads/test-task.json'; + + // Store payload in fake storage + Storage::disk('local')->put($path, $payload); + + // Set env vars for path-based payload + putenv('CLOUD_TASKS_TASK_NAME=test-task-name'); + putenv('CLOUD_TASKS_PAYLOAD_PATH=local:'.$path); + + // Act + $this->artisan('cloud-tasks:work-job')->assertSuccessful(); + + // Assert + Event::assertDispatched(fn (JobOutput $event) => $event->output === 'SimpleJob:success'); + + // Assert file was cleaned up + Storage::disk('local')->assertMissing($path); + } + + #[Test] + public function worker_fails_with_invalid_payload_path_format(): void + { + // Arrange + putenv('CLOUD_TASKS_TASK_NAME=test-task-name'); + putenv('CLOUD_TASKS_PAYLOAD_PATH=invalid-format-no-colon'); + + // Act & Assert + $this->artisan('cloud-tasks:work-job')->assertFailed(); + } + + #[Test] + public function worker_fails_when_payload_file_not_found(): void + { + // Arrange + Storage::fake('local'); + putenv('CLOUD_TASKS_TASK_NAME=test-task-name'); + putenv('CLOUD_TASKS_PAYLOAD_PATH=local:non-existent-file.json'); + + // Act & Assert + $this->artisan('cloud-tasks:work-job')->assertFailed(); + } +}