Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
100.00% covered (success)
100.00%
90 / 90
100.00% covered (success)
100.00%
19 / 19
CRAP
100.00% covered (success)
100.00%
1 / 1
Worker
100.00% covered (success)
100.00%
90 / 90
100.00% covered (success)
100.00%
19 / 19
44
100.00% covered (success)
100.00%
1 / 1
 __construct
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getPid
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 isRunning
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getStatus
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getExitCode
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getTerminationSignal
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getOutput
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getErrorOutput
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 wait
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
3
 kill
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
4
 startForkedWorker
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
3
 executeCallback
100.00% covered (success)
100.00%
31 / 31
100.00% covered (success)
100.00%
1 / 1
5
 normalizeExitCode
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
3
 pollState
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
7
 isCurrentWorkerProcess
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 detectCurrentPid
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
3
 isInterruptedWait
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
 isNoChildError
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
 isNoSuchProcessError
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
1<?php
2
3declare(strict_types=1);
4
5/**
6 * This file is part of fast-forward/fork.
7 *
8 * This source file is subject to the license bundled
9 * with this source code in the file LICENSE.
10 *
11 * @copyright Copyright (c) 2026 Felipe Sayão Lobato Abreu <github@mentordosnerds.com>
12 * @license   https://opensource.org/licenses/MIT MIT License
13 *
14 * @see       https://github.com/php-fast-forward/fork
15 * @see       https://github.com/php-fast-forward
16 * @see       https://datatracker.ietf.org/doc/html/rfc2119
17 */
18
19namespace FastForward\Fork\Worker;
20
21use Closure;
22use FastForward\Fork\Exception\LogicException;
23use FastForward\Fork\Exception\RuntimeException;
24use FastForward\Fork\Manager\ForkManagerInterface;
25use FastForward\Fork\Signal\Signal;
26use Psr\Log\LoggerInterface;
27use Throwable;
28
29use function error_reporting;
30use function getmypid;
31use function max;
32use function min;
33use function ob_end_flush;
34use function ob_get_level;
35use function ob_start;
36use function pcntl_fork;
37use function posix_get_last_error;
38use function pcntl_waitpid;
39use function posix_kill;
40use function restore_error_handler;
41use function set_error_handler;
42
43/**
44 * Represents a single forked worker process.
45 *
46 * This class encapsulates the lifecycle of a worker created via process forking.
47 * It is responsible for:
48 * - Spawning the child process
49 * - Executing the provided callback in the child context
50 * - Capturing output and errors
51 * - Synchronizing state with the master process
52 *
53 * Instances of this class MUST be created exclusively by a manager implementation.
54 * Consumers SHOULD NOT instantiate this class directly.
55 */
56  class Worker implements WorkerInterface
57{
58    /**
59     * POSIX error code for interrupted system calls.
60     */
61    private const int INTERRUPTED_SYSTEM_CALL_ERROR = 4;
62
63    /**
64     * POSIX error code for missing child processes.
65     */
66    private const int NO_CHILD_PROCESS_ERROR = 10;
67
68    /**
69     * POSIX error code for non-existent processes.
70     */
71    private const int NO_SUCH_PROCESS_ERROR = 3;
72
73    /**
74     * Stores the callback executed inside the child process.
75     *
76     * The callback MUST accept the current worker instance as its only argument.
77     *
78     * @var Closure(WorkerInterface):mixed
79     */
80    private Closure $callback;
81
82    /**
83     * Initializes the worker and immediately forks the process.
84     *
85     * The constructor MUST trigger process forking and configure both parent
86     * and child execution contexts accordingly.
87     *
88     * @param ForkManagerInterface $manager manager responsible for this worker
89     * @param WorkerState $state shared mutable state between parent and child
90     * @param callable(WorkerInterface): mixed $callback callback executed in the child process
91     * @param ?LoggerInterface $logger logger used for lifecycle events
92     */
93    public function __construct(
94        private ForkManagerInterface $manager,
95        private WorkerState $state,
96        callable $callback,
97        private ?LoggerInterface $logger = null,
98    ) {
99        $this->callback = Closure::fromCallable($callback);
100        $this->startForkedWorker();
101    }
102
103    /**
104     * {@inheritDoc}
105     */
106    public function getPid(): int
107    {
108        return $this->state->getPid();
109    }
110
111    /**
112     * {@inheritDoc}
113     */
114    public function isRunning(): bool
115    {
116        $this->pollState();
117
118        return $this->state->isRunning();
119    }
120
121    /**
122     * {@inheritDoc}
123     */
124    public function getStatus(): ?int
125    {
126        $this->pollState();
127
128        return $this->state->getStatus();
129    }
130
131    /**
132     * {@inheritDoc}
133     */
134    public function getExitCode(): ?int
135    {
136        $this->pollState();
137
138        return $this->state->getExitCode();
139    }
140
141    /**
142     * {@inheritDoc}
143     */
144    public function getTerminationSignal(): ?Signal
145    {
146        $this->pollState();
147
148        return $this->state->getTerminationSignal();
149    }
150
151    /**
152     * {@inheritDoc}
153     */
154    public function getOutput(): string
155    {
156        $this->pollState();
157
158        return $this->state->getOutput();
159    }
160
161    /**
162     * {@inheritDoc}
163     */
164    public function getErrorOutput(): string
165    {
166        $this->pollState();
167
168        return $this->state->getErrorOutput();
169    }
170
171    /**
172     * {@inheritDoc}
173     *
174     * This method SHALL block until the worker finishes execution.
175     * It MUST NOT be invoked from within the same worker process.
176     */
177    public function wait(): void
178    {
179        if (! $this->state->isRunning()) {
180            return;
181        }
182
183        if ($this->isCurrentWorkerProcess()) {
184            throw LogicException::forWorkerWaitingOnItself($this->state->getPid());
185        }
186
187        $this->manager->wait($this);
188    }
189
190    /**
191     * {@inheritDoc}
192     *
193     * This method attempts to send a POSIX signal to the worker process.
194     * If the process no longer exists, the worker SHALL be marked as detached.
195     */
196    public function kill(Signal $signal = Signal::Terminate): void
197    {
198        if (! $this->state->isRunning()) {
199            return;
200        }
201
202        if (posix_kill($this->state->getPid(), $signal->value)) {
203            return;
204        }
205
206        if ($this->isNoSuchProcessError(posix_get_last_error())) {
207            $this->state->markDetached($this->logger);
208        }
209    }
210
211    /**
212     * Forks the current process and initializes parent/child execution paths.
213     *
214     * The parent process SHALL retain control and track the worker.
215     * The child process MUST execute the provided callback and terminate.
216     *
217     * @throws RuntimeException if the fork operation fails
218     */
219    private function startForkedWorker(): void
220    {
221        $pid = pcntl_fork();
222
223        // @codeCoverageIgnoreStart
224        if (-1 === $pid) {
225            throw RuntimeException::forUnableToForkWorker();
226        }
227
228        // @codeCoverageIgnoreEnd
229
230        if ($pid > 0) {
231            $this->state->activateParent($pid);
232
233            $this->logger?->info('Forked worker process.', [
234                'worker_pid' => $pid,
235            ]);
236
237            return;
238        }
239
240        // @codeCoverageIgnoreStart
241        $pid = $this->detectCurrentPid();
242        $this->state->activateChild($pid);
243
244        pcntl_async_signals(true);
245
246        $this->logger?->info('Starting forked worker execution.', [
247            'worker_pid' => $pid,
248        ]);
249
250        exit($this->executeCallback());
251        // @codeCoverageIgnoreEnd
252    }
253
254    /**
255     * Executes the worker callback and normalizes the exit code.
256     *
257     * Output buffering MUST be used to capture standard output and forward it
258     * to the parent process. Errors SHALL be intercepted and written to the
259     * worker error output channel.
260     *
261     * @return int normalized process exit code (0–255)
262     */
263    private function executeCallback(): int
264    {
265        $bufferLevel = ob_get_level();
266
267        set_error_handler(function (int $severity, string $message, string $file, int $line): bool {
268            if ((error_reporting() & $severity) === 0) {
269                return false;
270            }
271
272            $this->state->writeErrorOutput(\sprintf(
273                "[fast-forward/fork] PHP error %d: %s in %s on line %d\n",
274                $severity,
275                $message,
276                $file,
277                $line,
278            ));
279
280            return true;
281        });
282
283        ob_start(function (string $buffer): string {
284            $this->state->writeOutput($buffer);
285
286            return '';
287        }, 1);
288
289        try {
290            $result = ($this->callback)($this);
291
292            return $this->normalizeExitCode($result);
293        } catch (Throwable $throwable) {
294            $this->state->writeErrorOutput(\sprintf(
295                "[fast-forward/fork] Worker %d failed with %s: %s\n",
296                $this->state->getPid(),
297                $throwable::class,
298                $throwable->getMessage(),
299            ));
300
301            return $throwable->getCode() > 0 ? $throwable->getCode() : 255;
302        } finally {
303            while (ob_get_level() > $bufferLevel) {
304                ob_end_flush();
305            }
306
307            restore_error_handler();
308            $this->state->closeChildSide();
309        }
310    }
311
312    /**
313     * Normalizes a callback result into a valid exit code.
314     *
315     * Integers MUST be clamped to the range 0–255.
316     * A boolean false SHALL be converted to exit code 1.
317     * Any other value SHALL result in exit code 0.
318     *
319     * @param mixed $result callback return value
320     *
321     * @return int normalized exit code
322     */
323    private function normalizeExitCode(mixed $result): int
324    {
325        if (\is_int($result)) {
326            return max(0, min(255, $result));
327        }
328
329        if (false === $result) {
330            return 1;
331        }
332
333        return 0;
334    }
335
336    /**
337     * Performs a non-blocking state synchronization with the worker process.
338     *
339     * This method MUST only operate in the master process context.
340     *
341     * @throws RuntimeException if waiting for the worker fails unexpectedly
342     */
343    private function pollState(): void
344    {
345        if (! $this->state->isRunning() || $this->isCurrentWorkerProcess()) {
346            return;
347        }
348
349        $this->state->drainOutput(logger: $this->logger);
350
351        $status = 0;
352        $pid = pcntl_waitpid($this->state->getPid(), $status, \WNOHANG);
353
354        if ($pid > 0) {
355            $this->state->markTerminated($status, $this->logger);
356
357            return;
358        }
359
360        if (0 === $pid) {
361            return;
362        }
363
364        $error = pcntl_get_last_error();
365
366        // @codeCoverageIgnoreStart
367        if ($this->isInterruptedWait($error)) {
368            return;
369        }
370
371        // @codeCoverageIgnoreEnd
372
373        if ($this->isNoChildError($error)) {
374            $this->state->markDetached($this->logger);
375
376            return;
377        }
378
379        // @codeCoverageIgnoreStart
380        throw RuntimeException::forWorkerWaitFailure($this->state->getPid(), pcntl_strerror($error));
381        // @codeCoverageIgnoreEnd
382    }
383
384    /**
385     * Determines whether the current process corresponds to this worker.
386     *
387     * @return bool true if executing inside this worker process
388     */
389    private function isCurrentWorkerProcess(): bool
390    {
391        return $this->manager->isWorker()
392            && $this->detectCurrentPid() === $this->state->getPid();
393    }
394
395    /**
396     * Detects the current process identifier (PID).
397     *
398     * @return int the current process ID
399     *
400     * @throws RuntimeException if the PID cannot be determined
401     */
402    private function detectCurrentPid(): int
403    {
404        $pid = getmypid();
405
406        // @codeCoverageIgnoreStart
407        if (! \is_int($pid) || $pid < 1) {
408            throw RuntimeException::forUndetectableProcessIdentifier();
409        }
410
411        // @codeCoverageIgnoreEnd
412
413        return $pid;
414    }
415
416    /**
417     * Determines whether a wait failure was caused by an interrupted system call.
418     *
419     * @param int $error error code returned by pcntl
420     *
421     * @return bool true if interrupted
422     */
423    private function isInterruptedWait(int $error): bool
424    {
425        return (\defined('PCNTL_EINTR') ? \PCNTL_EINTR : self::INTERRUPTED_SYSTEM_CALL_ERROR) === $error;
426    }
427
428    /**
429     * Determines whether no child processes remain.
430     *
431     * @param int $error error code returned by pcntl
432     *
433     * @return bool true if no child processes exist
434     */
435    private function isNoChildError(int $error): bool
436    {
437        return (\defined('PCNTL_ECHILD') ? \PCNTL_ECHILD : self::NO_CHILD_PROCESS_ERROR) === $error;
438    }
439
440    /**
441     * Determines whether the process no longer exists.
442     *
443     * @param int $error error code returned by POSIX APIs
444     *
445     * @return bool true if the target process is gone
446     */
447    private function isNoSuchProcessError(int $error): bool
448    {
449        return (\defined('POSIX_ESRCH') ? \POSIX_ESRCH : self::NO_SUCH_PROCESS_ERROR) === $error;
450    }
451}