Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
100.00% covered (success)
100.00%
81 / 81
100.00% covered (success)
100.00%
14 / 14
CRAP
100.00% covered (success)
100.00%
1 / 1
WorkerOutputTransport
100.00% covered (success)
100.00%
81 / 81
100.00% covered (success)
100.00%
14 / 14
37
100.00% covered (success)
100.00%
1 / 1
 __construct
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 create
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
5
 activateParentSide
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
3
 activateChildSide
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getReadableStreams
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
3
 drain
100.00% covered (success)
100.00%
24 / 24
100.00% covered (success)
100.00%
1 / 1
1
 writeOutput
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 writeErrorOutput
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 closeChildSide
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getOutput
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getErrorOutput
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 writeTo
100.00% covered (success)
100.00%
8 / 8
100.00% covered (success)
100.00%
1 / 1
6
 readFrom
100.00% covered (success)
100.00%
16 / 16
100.00% covered (success)
100.00%
1 / 1
10
 closeStream
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
2
1<?php
2
3declare(strict_types=1);
4
5/**
6 * This file is part of fast-forward/fork.
7 *
8 * This source file is subject to the license bundled
9 * with this source code in the file LICENSE.
10 *
11 * @copyright Copyright (c) 2026 Felipe SayĆ£o Lobato Abreu <github@mentordosnerds.com>
12 * @license   https://opensource.org/licenses/MIT MIT License
13 *
14 * @see       https://github.com/php-fast-forward/fork
15 * @see       https://github.com/php-fast-forward
16 * @see       https://datatracker.ietf.org/doc/html/rfc2119
17 */
18
19namespace FastForward\Fork\Worker;
20
21use Closure;
22use FastForward\Fork\Exception\RuntimeException;
23use Psr\Log\LoggerInterface;
24
25use function fclose;
26use function feof;
27use function fwrite;
28use function stream_get_contents;
29use function stream_set_blocking;
30use function stream_socket_pair;
31use function substr;
32
33/**
34 * Manages the socket-based transport used to stream worker stdout and error output back to the parent.
35 *
36 * This component is responsible for:
37 * - Creating independent socket pairs for stdout and stderr
38 * - Coordinating parent/child stream ownership
39 * - Streaming data between processes
40 * - Accumulating output buffers
41 *
42 * Instances of this class MUST be created via the factory method to ensure
43 * proper resource initialization.
44 *
45 * @internal
46 */
47 class WorkerOutputTransport
48{
49    /**
50     * Accumulates stdout captured from the worker process.
51     */
52    private string $output = '';
53
54    /**
55     * Accumulates error output captured from the worker process.
56     */
57    private string $errorOutput = '';
58
59    /**
60     * Initializes the transport with reader/writer socket pairs.
61     *
62     * @param resource|null $outputReader reader socket used by the parent process for stdout
63     * @param resource|null $outputWriter writer socket used by the child process for stdout
64     * @param resource|null $errorReader reader socket used by the parent process for error output
65     * @param resource|null $errorWriter writer socket used by the child process for error output
66     */
67    private function __construct(
68        private $outputReader,
69        private $outputWriter,
70        private $errorReader,
71        private $errorWriter,
72    ) {}
73
74    /**
75     * Creates a fresh transport with independent stdout and error socket pairs.
76     *
77     * Both socket pairs MUST be successfully created. If allocation fails,
78     * all partially allocated resources SHALL be released and an exception MUST be thrown.
79     *
80     * @return self a fully initialized transport instance
81     *
82     * @throws RuntimeException if socket allocation fails
83     */
84    public static function create(): self
85    {
86        $outputPair = stream_socket_pair(\STREAM_PF_UNIX, \STREAM_SOCK_STREAM, 0);
87        $errorPair = stream_socket_pair(\STREAM_PF_UNIX, \STREAM_SOCK_STREAM, 0);
88
89        // @codeCoverageIgnoreStart
90        if (! \is_array($outputPair) || 2 !== \count($outputPair)) {
91            throw RuntimeException::forWorkerOutputAllocationFailure();
92        }
93
94        // @codeCoverageIgnoreEnd
95
96        // @codeCoverageIgnoreStart
97        if (! \is_array($errorPair) || 2 !== \count($errorPair)) {
98            fclose($outputPair[0]);
99            fclose($outputPair[1]);
100
101            throw RuntimeException::forWorkerOutputAllocationFailure();
102        }
103
104        // @codeCoverageIgnoreEnd
105
106        return new self(
107            outputReader: $outputPair[0],
108            outputWriter: $outputPair[1],
109            errorReader: $errorPair[0],
110            errorWriter: $errorPair[1],
111        );
112    }
113
114    /**
115     * Activates the parent side of the transport.
116     *
117     * The parent process MUST close child-side writers and SHOULD switch
118     * reader streams to non-blocking mode.
119     */
120    public function activateParentSide(): void
121    {
122        $this->closeStream($this->outputWriter);
123        $this->closeStream($this->errorWriter);
124
125        if (\is_resource($this->outputReader)) {
126            stream_set_blocking($this->outputReader, false);
127        }
128
129        if (\is_resource($this->errorReader)) {
130            stream_set_blocking($this->errorReader, false);
131        }
132    }
133
134    /**
135     * Activates the child side of the transport.
136     *
137     * The child process MUST close parent-side readers to prevent descriptor leaks.
138     */
139    public function activateChildSide(): void
140    {
141        $this->closeStream($this->outputReader);
142        $this->closeStream($this->errorReader);
143    }
144
145    /**
146     * Returns the currently readable transport streams.
147     *
148     * Only valid and open stream resources SHALL be returned.
149     *
150     * @return array<int, resource> list of readable streams
151     */
152    public function getReadableStreams(): array
153    {
154        $streams = [];
155
156        if (\is_resource($this->outputReader)) {
157            $streams[] = $this->outputReader;
158        }
159
160        if (\is_resource($this->errorReader)) {
161            $streams[] = $this->errorReader;
162        }
163
164        return $streams;
165    }
166
167    /**
168     * Drains readable data from the transport into internal buffers.
169     *
170     * The method MAY operate on a subset of streams if provided.
171     * When finalization is requested, exhausted streams MUST be closed.
172     *
173     * @param int $workerPid PID associated with the drained worker
174     * @param array<int, resource> $readableStreams optional subset of readable streams
175     * @param bool $final whether this is the final drain operation
176     * @param ?LoggerInterface $logger logger used for chunk-level output events
177     */
178    public function drain(
179        int $workerPid,
180        array $readableStreams = [],
181        bool $final = false,
182        ?LoggerInterface $logger = null,
183    ): void {
184        $this->output .= $this->readFrom(
185            stream: $this->outputReader,
186            readableStreams: $readableStreams,
187            final: $final,
188            level: 'debug',
189            message: 'Worker stdout.',
190            workerPid: $workerPid,
191            close: function (): void {
192                $this->closeStream($this->outputReader);
193            },
194            logger: $logger,
195        );
196
197        $this->errorOutput .= $this->readFrom(
198            stream: $this->errorReader,
199            readableStreams: $readableStreams,
200            final: $final,
201            level: 'warning',
202            message: 'Worker stderr.',
203            workerPid: $workerPid,
204            close: function (): void {
205                $this->closeStream($this->errorReader);
206            },
207            logger: $logger,
208        );
209    }
210
211    /**
212     * Writes a stdout chunk to the child transport.
213     *
214     * The operation MUST attempt to write the full chunk.
215     *
216     * @param string $chunk data to write to the standard output stream
217     */
218    public function writeOutput(string $chunk): void
219    {
220        $this->writeTo($this->outputWriter, $chunk);
221    }
222
223    /**
224     * Writes an error-output chunk to the child transport.
225     *
226     * The operation MUST attempt to write the full chunk.
227     *
228     * @param string $chunk data to write to the error stream
229     */
230    public function writeErrorOutput(string $chunk): void
231    {
232        $this->writeTo($this->errorWriter, $chunk);
233    }
234
235    /**
236     * Closes the child-side writer streams.
237     *
238     * This method SHOULD be invoked after callback execution completes.
239     */
240    public function closeChildSide(): void
241    {
242        $this->closeStream($this->outputWriter);
243        $this->closeStream($this->errorWriter);
244    }
245
246    /**
247     * Returns the accumulated stdout.
248     *
249     * @return string captured standard output
250     */
251    public function getOutput(): string
252    {
253        return $this->output;
254    }
255
256    /**
257     * Returns the accumulated error output.
258     *
259     * @return string captured error output
260     */
261    public function getErrorOutput(): string
262    {
263        return $this->errorOutput;
264    }
265
266    /**
267     * Writes a chunk to the provided stream until no more bytes can be written.
268     *
269     * @param resource|null $stream Target stream
270     * @param string $chunk Data to write
271     */
272    private function writeTo($stream, string $chunk): void
273    {
274        if ('' === $chunk || ! \is_resource($stream)) {
275            return;
276        }
277
278        $remaining = $chunk;
279
280        while ('' !== $remaining) {
281            $written = @fwrite($stream, $remaining);
282
283            if (false === $written || 0 === $written) {
284                break;
285            }
286
287            $remaining = substr($remaining, $written);
288        }
289    }
290
291    /**
292     * Reads data from a stream and returns the collected chunk.
293     *
294     * If the stream is not readable or not selected, the method SHALL return an empty string.
295     * When finalization is enabled and EOF is reached, the stream MUST be closed.
296     *
297     * @param resource|null $stream source stream
298     * @param array<int, resource> $readableStreams selected readable streams
299     * @param bool $final whether this is the final drain operation
300     * @param string $level log level
301     * @param string $message log message
302     * @param int $workerPid worker identifier
303     * @param Closure $close closure responsible for closing the stream
304     * @param ?LoggerInterface $logger logger used for output events
305     *
306     * @return string the read chunk
307     */
308    private function readFrom(
309        $stream,
310        array $readableStreams,
311        bool $final,
312        string $level,
313        string $message,
314        int $workerPid,
315        Closure $close,
316        ?LoggerInterface $logger = null,
317    ): string {
318        if (! \is_resource($stream)) {
319            return '';
320        }
321
322        if ([] !== $readableStreams && ! \in_array($stream, $readableStreams, true)) {
323            return '';
324        }
325
326        $chunk = stream_get_contents($stream);
327
328        if (! \is_string($chunk) || '' === $chunk) {
329            if ($final && feof($stream)) {
330                $close();
331            }
332
333            return '';
334        }
335
336        $logger?->log($level, $message, [
337            'worker_pid' => $workerPid,
338            'output' => $chunk,
339        ]);
340
341        if ($final && feof($stream)) {
342            $close();
343        }
344
345        return $chunk;
346    }
347
348    /**
349     * Closes a stream resource and clears its reference.
350     *
351     * @param resource|null $stream stream to close
352     */
353    private function closeStream(&$stream): void
354    {
355        if (! \is_resource($stream)) {
356            return;
357        }
358
359        fclose($stream);
360        $stream = null;
361    }
362}