Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
99.22% covered (success)
99.22%
127 / 128
94.74% covered (success)
94.74%
18 / 19
CRAP
0.00% covered (danger)
0.00%
0 / 1
ForkManager
99.22% covered (success)
99.22%
127 / 128
94.74% covered (success)
94.74%
18 / 19
72
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 isSupported
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
8
 fork
100.00% covered (success)
100.00%
17 / 17
100.00% covered (success)
100.00%
1 / 1
5
 wait
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 kill
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getMasterPid
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 isMaster
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 isWorker
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 registerSignalHandlers
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
2
 waitOnWorkers
100.00% covered (success)
100.00%
36 / 36
100.00% covered (success)
100.00%
1 / 1
15
 killWorkers
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
2
 detectPid
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
4
 resolveWorkers
100.00% covered (success)
100.00%
14 / 14
100.00% covered (success)
100.00%
1 / 1
6
 assertWorkerBelongsToManager
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
3
 hasRunningTargets
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
3
 drainRunningWorkerOutputs
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
3
 collectExitedWorkers
94.74% covered (success)
94.74%
18 / 19
0.00% covered (danger)
0.00%
0 / 1
10.01
 isInterruptedWait
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
 isNoChildError
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
2
1<?php
2
3declare(strict_types=1);
4
5/**
6 * This file is part of fast-forward/fork.
7 *
8 * This source file is subject to the license bundled
9 * with this source code in the file LICENSE.
10 *
11 * @copyright Copyright (c) 2026 Felipe SayĆ£o Lobato Abreu <github@mentordosnerds.com>
12 * @license   https://opensource.org/licenses/MIT MIT License
13 *
14 * @see       https://github.com/php-fast-forward/fork
15 * @see       https://github.com/php-fast-forward
16 * @see       https://datatracker.ietf.org/doc/html/rfc2119
17 */
18
19namespace FastForward\Fork\Manager;
20
21use FastForward\Fork\Exception\InvalidArgumentException;
22use FastForward\Fork\Exception\LogicException;
23use FastForward\Fork\Exception\RuntimeException;
24use FastForward\Fork\Signal\DefaultSignalHandler;
25use FastForward\Fork\Signal\Signal;
26use FastForward\Fork\Signal\SignalHandlerInterface;
27use FastForward\Fork\Worker\Worker;
28use FastForward\Fork\Worker\WorkerGroup;
29use FastForward\Fork\Worker\WorkerGroupInterface;
30use FastForward\Fork\Worker\WorkerInterface;
31use FastForward\Fork\Worker\WorkerState;
32use Psr\Log\LoggerInterface;
33
34use function getmypid;
35use function usleep;
36
37/**
38 * Coordinates the lifecycle of worker processes using process forking.
39 *
40 * This class acts as the central orchestrator responsible for:
41 * - Creating worker processes
42 * - Managing their lifecycle and state
43 * - Handling inter-process communication via streams
44 * - Propagating and handling system signals
45 *
46 * Implementations MUST ensure that process control functions are available.
47 * The manager SHALL operate as a master process and MUST NOT be reused inside worker processes.
48 */
49 class ForkManager implements ForkManagerInterface
50{
51    /**
52     * POSIX error code for interrupted system calls.
53     */
54    private const int INTERRUPTED_SYSTEM_CALL_ERROR = 4;
55
56    /**
57     * POSIX error code for missing child processes.
58     */
59    private const int NO_CHILD_PROCESS_ERROR = 10;
60
61    /**
62     * Stores all worker instances indexed by their process ID (PID).
63     *
64     * Each worker instance MUST be uniquely identifiable by its PID.
65     *
66     * @var array<int, Worker>
67     */
68    private array $workersByPid = [];
69
70    /**
71     * Stores runtime state objects associated with each worker.
72     *
73     * Each state instance SHALL reflect the current lifecycle status of a worker.
74     *
75     * @var array<int, WorkerState>
76     */
77    private array $statesByPid = [];
78
79    /**
80     * PID of the master process that instantiated this manager.
81     *
82     * This value MUST remain immutable after initialization.
83     */
84    private  int $masterPid;
85
86    /**
87     * Initializes the manager, validates environment support, and registers signal handlers.
88     *
89     * The implementation MUST verify that all required extensions are available.
90     * If the environment does not support forking, a RuntimeException MUST be thrown.
91     *
92     * @param SignalHandlerInterface $signalHandler handler responsible for reacting to system signals
93     * @param ?LoggerInterface $logger logger used for lifecycle and output events
94     *
95     * @throws RuntimeException if the environment does not support process forking
96     */
97    public function __construct(
98        private  SignalHandlerInterface $signalHandler = new DefaultSignalHandler(),
99        private  ?LoggerInterface $logger = null,
100    ) {
101        // @codeCoverageIgnoreStart
102        if (! $this->isSupported()) {
103            throw RuntimeException::forUnsupportedForking();
104        }
105
106        // @codeCoverageIgnoreEnd
107
108        $this->masterPid = $this->detectPid();
109        $this->registerSignalHandlers();
110    }
111
112    /**
113     * Determines whether the current environment supports process forking.
114     *
115     * All required PHP functions MUST be available for safe execution.
116     *
117     * @return bool true if forking is supported; otherwise false
118     */
119    public function isSupported(): bool
120    {
121        return \function_exists('pcntl_async_signals')
122            && \function_exists('pcntl_fork')
123            && \function_exists('pcntl_signal')
124            && \function_exists('pcntl_waitpid')
125            && \function_exists('posix_getpid')
126            && \function_exists('posix_kill')
127            && \function_exists('stream_socket_pair')
128            && \function_exists('stream_select');
129    }
130
131    /**
132     * Forks one or more worker processes.
133     *
134     * The manager MUST NOT allow forking from within a worker process.
135     * The number of workers MUST be greater than zero.
136     *
137     * In case of failure during creation, all previously created workers SHALL be terminated.
138     *
139     * @param callable $workerCallback callback executed inside each worker process
140     * @param int $workerCount number of workers to spawn
141     *
142     * @return WorkerGroupInterface a group containing all created workers
143     *
144     * @throws LogicException if called from a worker process
145     * @throws InvalidArgumentException if worker count is invalid
146     * @throws RuntimeException if worker creation fails
147     */
148    public function fork(callable $workerCallback, int $workerCount = 1): WorkerGroupInterface
149    {
150        if ($this->isWorker()) {
151            throw LogicException::forForkFromWorkerProcess();
152        }
153
154        if ($workerCount < 1) {
155            throw InvalidArgumentException::forNonPositiveWorkerCount($workerCount);
156        }
157
158        $workers = [];
159
160        try {
161            for ($slot = 0; $slot < $workerCount; ++$slot) {
162                $state = WorkerState::create();
163
164                $worker = new Worker(
165                    manager: $this,
166                    state: $state,
167                    callback: $workerCallback,
168                    logger: $this->logger,
169                );
170
171                $workers[$worker->getPid()] = $worker;
172                $this->workersByPid[$worker->getPid()] = $worker;
173                $this->statesByPid[$worker->getPid()] = $state;
174            }
175
176            // @codeCoverageIgnoreStart
177        } catch (RuntimeException $runtimeException) {
178            $this->killWorkers($workers, Signal::Terminate);
179            $this->waitOnWorkers($workers);
180
181            throw $runtimeException;
182        }
183
184        // @codeCoverageIgnoreEnd
185
186        return new WorkerGroup($this, ...$workers);
187    }
188
189    /**
190     * Waits for one or more workers or worker groups to finish execution.
191     *
192     * If no workers are provided, the manager SHALL wait for all managed workers.
193     *
194     * @param WorkerInterface|WorkerGroupInterface ...$workers Workers or groups to wait for.
195     */
196    public function wait(WorkerInterface|WorkerGroupInterface ...$workers): void
197    {
198        $this->waitOnWorkers($this->resolveWorkers(...$workers));
199    }
200
201    /**
202     * Sends a signal to one or more workers.
203     *
204     * If no workers are provided, the signal SHALL be sent to all managed workers.
205     *
206     * @param Signal $signal signal to send (default: SIGTERM)
207     * @param WorkerInterface|WorkerGroupInterface ...$workers Target workers or groups.
208     */
209    public function kill(
210        Signal $signal = Signal::Terminate,
211        WorkerInterface|WorkerGroupInterface ...$workers,
212    ): void {
213        $this->killWorkers($this->resolveWorkers(...$workers), $signal);
214    }
215
216    /**
217     * Retrieves the PID of the master process.
218     *
219     * @return int the master process identifier
220     */
221    public function getMasterPid(): int
222    {
223        return $this->masterPid;
224    }
225
226    /**
227     * Determines whether the current process is the master process.
228     *
229     * @return bool true if master; otherwise false
230     */
231    public function isMaster(): bool
232    {
233        return $this->detectPid() === $this->masterPid;
234    }
235
236    /**
237     * Determines whether the current process is a worker process.
238     *
239     * @return bool true if worker; otherwise false
240     */
241    public function isWorker(): bool
242    {
243        return ! $this->isMaster();
244    }
245
246    /**
247     * Registers signal handlers for all configured signals.
248     *
249     * The implementation MUST enable asynchronous signal handling.
250     */
251    private function registerSignalHandlers(): void
252    {
253        pcntl_async_signals(true);
254
255        foreach ($this->signalHandler->signals() as $signal) {
256            pcntl_signal($signal->value, function (int $nativeSignal) use ($signal): void {
257                ($this->signalHandler)(
258                    $this,
259                    Signal::tryFrom($nativeSignal) ?? $signal,
260                );
261            });
262        }
263    }
264
265    /**
266     * Waits for the provided workers and processes their output streams.
267     *
268     * The method SHALL continuously monitor worker state and output until all workers terminate.
269     *
270     * @param array<int, Worker> $workers workers to monitor
271     */
272    private function waitOnWorkers(array $workers): void
273    {
274        if ([] === $workers) {
275            return;
276        }
277
278        $targetPids = array_fill_keys(array_keys($workers), true);
279
280        if ($this->isWorker() && isset($targetPids[$this->detectPid()])) {
281            throw LogicException::forWorkerWaitingOnItself($this->detectPid());
282        }
283
284        while ($this->hasRunningTargets($targetPids)) {
285            $this->drainRunningWorkerOutputs();
286            $this->collectExitedWorkers();
287
288            if (! $this->hasRunningTargets($targetPids)) {
289                break;
290            }
291
292            $readableStreams = [];
293            $streamMap = [];
294
295            foreach ($this->statesByPid as $state) {
296                if (! $state->isRunning()) {
297                    continue;
298                }
299
300                foreach ($state->getReadableStreams() as $stream) {
301                    $readableStreams[] = $stream;
302                    $streamMap[(int) $stream] = $state;
303                }
304            }
305
306            if ([] === $readableStreams) {
307                usleep(100_000);
308
309                continue;
310            }
311
312            $selectedStreams = $readableStreams;
313            $write = null;
314            $except = null;
315            $selected = @stream_select($selectedStreams, $write, $except, 0, 200_000);
316
317            // @codeCoverageIgnoreStart
318            if (false === $selected) {
319                continue;
320            }
321
322            // @codeCoverageIgnoreEnd
323
324            if ($selected > 0) {
325                $selectedByState = [];
326
327                foreach ($selectedStreams as $stream) {
328                    $state = $streamMap[(int) $stream] ?? null;
329
330                    if (! $state instanceof WorkerState) {
331                        continue;
332                    }
333
334                    $selectedByState[$state->getPid()][] = $stream;
335                }
336
337                foreach ($selectedByState as $pid => $streams) {
338                    $this->statesByPid[$pid]?->drainOutput(readableStreams: $streams, logger: $this->logger);
339                }
340            }
341
342            $this->collectExitedWorkers();
343        }
344
345        $this->drainRunningWorkerOutputs();
346    }
347
348    /**
349     * Sends a signal to all provided workers.
350     *
351     * @param array<int, Worker> $workers workers to signal
352     * @param Signal $signal signal to send
353     */
354    private function killWorkers(array $workers, Signal $signal): void
355    {
356        foreach ($workers as $worker) {
357            $worker->kill($signal);
358        }
359    }
360
361    /**
362     * Detects the current process ID (PID).
363     *
364     * The implementation SHOULD prefer POSIX functions when available.
365     * If detection fails, a RuntimeException MUST be thrown.
366     *
367     * @return int the current process identifier
368     *
369     * @throws RuntimeException if the PID cannot be determined
370     */
371    private function detectPid(): int
372    {
373        if (\function_exists('posix_getpid')) {
374            return posix_getpid();
375        }
376
377        // @codeCoverageIgnoreStart
378        $pid = getmypid();
379
380        if (! \is_int($pid) || $pid < 1) {
381            throw RuntimeException::forUndetectableProcessIdentifier();
382        }
383
384        return $pid;
385        // @codeCoverageIgnoreEnd
386    }
387
388    /**
389     * Resolves worker and group inputs into a unique set of workers.
390     *
391     * The method MUST validate ownership of each worker.
392     *
393     * @param WorkerInterface|WorkerGroupInterface ...$workers Targets to resolve.
394     *
395     * @return array<int, Worker>
396     *
397     * @throws InvalidArgumentException if a worker does not belong to this manager
398     */
399    private function resolveWorkers(WorkerInterface|WorkerGroupInterface ...$workers): array
400    {
401        if ([] === $workers) {
402            return $this->workersByPid;
403        }
404
405        $resolvedWorkers = [];
406
407        foreach ($workers as $target) {
408            if ($target instanceof WorkerGroupInterface) {
409                if ($target->getManager() !== $this) {
410                    throw InvalidArgumentException::forForeignWorkerGroup();
411                }
412
413                foreach ($target->all() as $worker) {
414                    $this->assertWorkerBelongsToManager($worker);
415                    $resolvedWorkers[$worker->getPid()] = $worker;
416                }
417
418                continue;
419            }
420
421            $this->assertWorkerBelongsToManager($target);
422            $resolvedWorkers[$target->getPid()] = $target;
423        }
424
425        return $resolvedWorkers;
426    }
427
428    /**
429     * Ensures that a worker belongs to this manager.
430     *
431     * The method MUST reject unsupported implementations and foreign workers.
432     *
433     * @param WorkerInterface $worker worker to validate
434     *
435     * @throws InvalidArgumentException if validation fails
436     */
437    private function assertWorkerBelongsToManager(WorkerInterface $worker): void
438    {
439        if (! $worker instanceof Worker) {
440            throw InvalidArgumentException::forUnsupportedWorkerImplementation($worker::class);
441        }
442
443        if (($this->workersByPid[$worker->getPid()] ?? null) === $worker) {
444            return;
445        }
446
447        throw InvalidArgumentException::forForeignWorker($worker->getPid());
448    }
449
450    /**
451     * Checks if any target worker is still running.
452     *
453     * @param array<int, true> $targetPids target process IDs
454     *
455     * @return bool true if at least one worker is running
456     */
457    private function hasRunningTargets(array $targetPids): bool
458    {
459        foreach (array_keys($targetPids) as $pid) {
460            if ($this->statesByPid[$pid]?->isRunning()) {
461                return true;
462            }
463        }
464
465        return false;
466    }
467
468    /**
469     * Drains output buffers from all running workers.
470     *
471     * This method SHOULD be called periodically to avoid blocking pipes.
472     */
473    private function drainRunningWorkerOutputs(): void
474    {
475        foreach ($this->statesByPid as $state) {
476            if (! $state->isRunning()) {
477                continue;
478            }
479
480            $state->drainOutput(logger: $this->logger);
481        }
482    }
483
484    /**
485     * Collects terminated child processes.
486     *
487     * The implementation MUST handle interrupted system calls and absence of child processes.
488     *
489     * @throws RuntimeException if waiting fails unexpectedly
490     */
491    private function collectExitedWorkers(): void
492    {
493        while (true) {
494            $status = 0;
495            $pid = pcntl_waitpid(-1, $status, \WNOHANG);
496
497            if ($pid > 0) {
498                $state = $this->statesByPid[$pid] ?? null;
499                if (! $state instanceof WorkerState) {
500                    continue;
501                }
502
503                if (! $state->isRunning()) {
504                    continue;
505                }
506
507                $state->markTerminated($status, $this->logger);
508
509                continue;
510            }
511
512            if (0 === $pid) {
513                return;
514            }
515
516            $error = pcntl_get_last_error();
517
518            // @codeCoverageIgnoreStart
519            if ($this->isInterruptedWait($error)) {
520                continue;
521            }
522
523            // @codeCoverageIgnoreEnd
524
525            if ($this->isNoChildError($error)) {
526                foreach ($this->statesByPid as $state) {
527                    if ($state->isRunning()) {
528                        $state->markDetached($this->logger);
529                    }
530                }
531
532                return;
533            }
534
535            // @codeCoverageIgnoreStart
536            throw RuntimeException::forWorkersWaitFailure(pcntl_strerror($error));
537            // @codeCoverageIgnoreEnd
538        }
539    }
540
541    /**
542     * Determines if a wait error was caused by an interrupted system call.
543     *
544     * @param int $error error code
545     *
546     * @return bool true if interrupted; otherwise false
547     */
548    private function isInterruptedWait(int $error): bool
549    {
550        return (\defined('PCNTL_EINTR') ? \PCNTL_EINTR : self::INTERRUPTED_SYSTEM_CALL_ERROR) === $error;
551    }
552
553    /**
554     * Determines if there are no remaining child processes.
555     *
556     * @param int $error error code
557     *
558     * @return bool true if no child processes remain
559     */
560    private function isNoChildError(int $error): bool
561    {
562        return (\defined('PCNTL_ECHILD') ? \PCNTL_ECHILD : self::NO_CHILD_PROCESS_ERROR) === $error;
563    }
564}