Skip to content

Commit 1f7ddcf

Browse files
authored
concurrent stream processing support (#10)
Signed-off-by: rahul <[email protected]>
1 parent cde144c commit 1f7ddcf

File tree

4 files changed

+185
-111
lines changed

4 files changed

+185
-111
lines changed

Diff for: src/Stream.php

-49
This file was deleted.

Diff for: src/StreamHandler.php

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
<?php
2+
3+
namespace rcsofttech85\FileHandler;
4+
5+
use Fiber;
6+
use rcsofttech85\FileHandler\Exception\StreamException;
7+
use Throwable;
8+
9+
class StreamHandler
10+
{
11+
private array $fibers = [];
12+
13+
14+
/**
15+
* @throws StreamException
16+
*/
17+
public function __construct(public readonly array $streamUrls, public readonly int $chunk = 100)
18+
{
19+
if (!$this->streamUrls) {
20+
throw new StreamException('No stream URLs provided.');
21+
}
22+
}
23+
24+
/**
25+
*/
26+
private function stream(string $streamUrl, string $outputFilename): Fiber
27+
{
28+
return new Fiber(function () use ($streamUrl, $outputFilename) {
29+
$stream = fopen($streamUrl, 'r');
30+
if (!$stream) {
31+
throw new StreamException("Failed to open stream: $streamUrl");
32+
}
33+
stream_set_blocking($stream, false);
34+
35+
$outputFile = fopen($outputFilename, 'w');
36+
37+
try {
38+
while (!feof($stream)) {
39+
$contents = fread($stream, $this->chunk);
40+
fwrite($outputFile, $contents);
41+
Fiber::suspend();
42+
}
43+
} catch (Throwable $e) {
44+
throw new StreamException();
45+
} finally {
46+
fclose($stream);
47+
fclose($outputFile);
48+
}
49+
});
50+
}
51+
52+
/**
53+
*/
54+
public function initiateConcurrentStreams(): self
55+
{
56+
foreach ($this->streamUrls as $outputFile => $streamUrl) {
57+
$fiber = $this->stream($streamUrl, $outputFile);
58+
59+
$this->fibers[] = $fiber;
60+
}
61+
62+
return $this;
63+
}
64+
65+
/**
66+
* @throws StreamException
67+
* @throws Throwable
68+
*/
69+
public function start(): self
70+
{
71+
if (!$this->fibers) {
72+
throw new StreamException("No fibers available to start");
73+
}
74+
75+
/** @var Fiber $fiber */
76+
foreach ($this->fibers as $fiber) {
77+
$fiber->start();
78+
}
79+
80+
return $this;
81+
}
82+
83+
/**
84+
* @throws Throwable
85+
*/
86+
public function resume(bool $resumeOnce = false): void
87+
{
88+
if (!$this->fibers) {
89+
throw new StreamException("No fibers are currently running");
90+
}
91+
92+
/** @var Fiber $fiber */
93+
foreach ($this->fibers as $fiber) {
94+
while (!$fiber->isTerminated()) {
95+
$fiber->resume();
96+
if ($resumeOnce) {
97+
break;
98+
}
99+
}
100+
}
101+
102+
$this->fibers = [];
103+
}
104+
}

Diff for: tests/Integration/StreamHandlerTest.php

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
<?php
2+
3+
namespace Integration;
4+
5+
use PHPUnit\Framework\Attributes\DataProvider;
6+
use PHPUnit\Framework\Attributes\Group;
7+
use PHPUnit\Framework\Attributes\Test;
8+
use PHPUnit\Framework\TestCase;
9+
use rcsofttech85\FileHandler\Exception\StreamException;
10+
use rcsofttech85\FileHandler\StreamHandler;
11+
12+
#[Group("integration")]
13+
class StreamHandlerTest extends TestCase
14+
{
15+
public static function tearDownAfterClass(): void
16+
{
17+
parent::tearDownAfterClass();
18+
19+
$files = ["output.html", "output1.html", "output2.html"];
20+
21+
foreach ($files as $file) {
22+
if (file_exists($file)) {
23+
unlink($file);
24+
}
25+
}
26+
}
27+
28+
#[Test]
29+
#[DataProvider('streamDataProvider')]
30+
public function streamAndWriteToFile($urls)
31+
{
32+
$stream = new StreamHandler($urls);
33+
$stream->initiateConcurrentStreams()->start()->resume();
34+
35+
foreach ($urls as $file => $url) {
36+
$this->assertGreaterThan(0, filesize($file));
37+
$this->assertStringContainsString('<!DOCTYPE html>', file_get_contents($file));
38+
$this->assertStringContainsString('</html>', file_get_contents($file));
39+
}
40+
}
41+
42+
#[Test]
43+
#[DataProvider('wrongStreamDataProvider')]
44+
public function throwExceptionIfUrlIsInvalid($outputFile, $url)
45+
{
46+
$stream = new StreamHandler([$outputFile => $url]);
47+
48+
49+
$this->expectException(StreamException::class);
50+
$stream->initiateConcurrentStreams()->start()->resume();
51+
}
52+
53+
#[Test]
54+
public function throwExceptionIfEmptyDataProvided()
55+
{
56+
$this->expectException(StreamException::class);
57+
$this->expectExceptionMessage('No stream URLs provided.');
58+
new StreamHandler([]);
59+
}
60+
61+
public static function wrongStreamDataProvider(): iterable
62+
{
63+
yield ["output.html", "https://gist.github"];
64+
}
65+
66+
67+
public static function streamDataProvider(): iterable
68+
{
69+
yield [
70+
[
71+
"output.html" =>
72+
"https://gist.github.com/rcsofttech85/629b37d483c4796db7bdcb3704067631#file-gistfile1-txt",
73+
74+
"output1.html" => "https://gist.github.com/rcsofttech85/f71f2454b1fc40a077cda14ef3097385#file-gistfile1-txt",
75+
76+
77+
"output2.html" => "https://gist.github.com/rcsofttech85/79ab19f1502e72c95cfa97d5205fa47d#file-gistfile1-txt"
78+
]
79+
];
80+
}
81+
}

Diff for: tests/Integration/StreamTest.php

-62
This file was deleted.

0 commit comments

Comments
 (0)