Skip to content

Record filtering and mapping #169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 20 commits into
base: 1.x
Choose a base branch
from
Draft
7 changes: 6 additions & 1 deletion src/Contracts/Ingest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ interface Ingest
{
public function write(Record $record): void;

public function ping(): void;
public function ping(): void; // todo remove

/**
* @param (callable(Record): bool) $filter
*/
public function filter(callable $filter): void;

public function digest(): void;

Expand Down
19 changes: 19 additions & 0 deletions src/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Laravel\Nightwatch\Contracts\Ingest;
use Laravel\Nightwatch\Facades\Nightwatch;
use Laravel\Nightwatch\Hooks\GuzzleMiddleware;
use Laravel\Nightwatch\Records\Record;
use Laravel\Nightwatch\State\CommandState;
use Laravel\Nightwatch\State\RequestState;
use Throwable;
Expand Down Expand Up @@ -47,6 +48,24 @@ public function user(callable $callback): void
$this->userDetailsResolver = $callback;
}

/**
* @api
*
* @param (callable(Record): bool) $callback
*/
public function filter(callable $callback): void
{
$this->ingest->filter(function (Record $record) use ($callback) {
try {
return $callback($record);
} catch (Throwable $e) {
Nightwatch::unrecoverableExceptionOccurred($e);

return false;
}
});
}

/**
* @api
*/
Expand Down
15 changes: 14 additions & 1 deletion src/Ingest.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ final class Ingest implements IngestContract
*/
private array $timeout;

/**
* @var list<(callable(Record): bool)>
*/
private array $filters = [];

/**
* @param (callable(string $address, float $timeout): resource) $streamFactory
*/
Expand All @@ -50,6 +55,14 @@ public function __construct(
];
}

/**
* @param (callable(Record): bool) $callback
*/
public function filter(callable $callback): void
{
$this->filters[] = $callback;
}

public function write(Record $record): void
{
$this->buffer->write($record);
Expand All @@ -71,7 +84,7 @@ public function ping(): void

public function digest(): void
{
$this->transmit($this->buffer->pull());
$this->transmit($this->buffer->pull($this->filters));
}

private function transmit(Payload $payload): void
Expand Down
23 changes: 20 additions & 3 deletions src/RecordsBuffer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Countable;
use Laravel\Nightwatch\Records\Record;

use function array_shift;
use function count;
use function json_encode;

Expand All @@ -28,15 +29,31 @@ public function count(): int
return count($this->records);
}

public function pull(): Payload
/**
* @param list<(callable(Record): bool)> $filters
*/
public function pull(array $filters = []): Payload
{
if ($this->records === []) {
return Payload::json('[]');
}

$records = json_encode($this->records, flags: JSON_THROW_ON_ERROR);
if ($filters === []) {
$records = $this->records;
$this->records = [];
} else {
$records = [];

$this->records = [];
while ($record = array_shift($this->records)) {
foreach ($filters as $filter) {
if ($filter($record)) {
$records[] = $record;
}
}
}
}

$records = json_encode($records, flags: JSON_THROW_ON_ERROR);

return Payload::json($records);
}
Expand Down
15 changes: 14 additions & 1 deletion tests/FakeIngest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class FakeIngest implements Ingest
*/
public array $writes = [];

/**
* @var list<(callable(Record): bool)>
*/
public array $filters = [];

public function __construct(
public RecordsBuffer $buffer = new RecordsBuffer,
) {
Expand All @@ -35,9 +40,17 @@ public function write(Record $record): void
$this->buffer->write($record);
}

/**
* @param (callable(Record): bool) $filter
*/
public function filter(callable $filter): void
{
$this->filters[] = $filter;
}

public function digest(): void
{
$this->writes[] = $this->buffer->pull()->rawPayload();
$this->writes[] = $this->buffer->pull($this->filters)->rawPayload();
}

public function ping(): void
Expand Down
6 changes: 5 additions & 1 deletion tests/FakeRecord.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@

class FakeRecord extends Record
{
public string $t = 'fake-record';
public function __construct(
public string $t = 'fake-record',
) {
//
}
}
71 changes: 71 additions & 0 deletions tests/FakeTcpStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

namespace Tests;

use Illuminate\Support\Collection;
use PHPUnit\Framework\Assert;
use RuntimeException;

use function call_user_func_array;
use function is_string;
use function strlen;

class FakeTcpStream
{
private static ?Collection $instances = null;

public $context;

public string $value = '';

public function __construct()
{
self::instances()->push($this);
}

public static function instances(): Collection
{
return self::$instances ??= new Collection;
}

/**
* @param array<mixed> $arguments
*/
public function __call(string $name, array $arguments): mixed
{
$handler = match ($name) {
'stream_open' => fn (string $path, string $mode, int $options, ?string &$openedPath): bool => true,
'stream_set_option' => fn (int $option, int $arg1, int $arg2): bool => true,
'stream_write' => function (string $value): int {
$this->value .= $value;

return strlen($value);
},
'stream_read' => fn (int $length): string|false => '2:OK',
'stream_eof' => fn (): bool => false,
'stream_flush' => fn (): bool => true,
'stream_close' => function (): void {
//
},
default => throw new RuntimeException("FakeTcpStream method not implemented [{$name}]"),
};

return call_user_func_array($handler, $arguments);
}

public function assertWritten(string|callable $value): self
{
if (is_string($value)) {
Assert::assertSame($value, $this->value);
} else {
Assert::assertTrue($value($this->value));
}

return $this;
}

public static function flush(): void
{
self::$instances = null;
}
}
134 changes: 134 additions & 0 deletions tests/Feature/FilteringAndMappingTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<?php

namespace Tests\Feature;

use Illuminate\Support\Facades\DB;
use Laravel\Nightwatch\Facades\Nightwatch;
use Laravel\Nightwatch\Payload;
use Laravel\Nightwatch\Records\Query;
use Laravel\Nightwatch\Records\Record;
use RuntimeException;
use Tests\FakeRecord;
use Tests\TestCase;

use function array_shift;
use function collect;
use function is_numeric;
use function str_contains;

class FilteringAndMappingTest extends TestCase
{
public function test_it_can_filter_records()
{
$streams = $this->fakeTcpStreams();

Nightwatch::filter(function (Record $record): bool {
if ($record instanceof Query) {
return ! str_contains($record->sql, 'Laravel 3');
}

return true;
});

DB::select('select * from users where name = "Laravel 1"');
DB::select('select * from users where name = "Laravel 2"');
DB::select('select * from users where name = "Laravel 3"');
$this->core->digest();

$streams->first()->assertWritten(function ($value) {
$this->assertStringContainsString('Laravel 1', $value);
$this->assertStringContainsString('Laravel 2', $value);
$this->assertStringNotContainsString('Laravel 3', $value);

return true;
});
}

public function test_filtered_payloads_are_always_an_array(): void
{
$streams = $this->fakeTcpStreams();
$filterResult = [false, true];

Nightwatch::filter(function (Record $record) use (&$filterResult): bool {
return array_shift($filterResult);
});

$this->core->ingest->write(new FakeRecord);
$this->core->ingest->write(new FakeRecord);
$this->core->digest();

$streams->first()->assertWritten('29:'.Payload::SIGNATURE.':[{"t":"fake-record"}]');
}

public function test_it_filters_falsey_values()
{
$streams = $this->fakeTcpStreams();
$filterResult = [null, false, '', 0, true];

Nightwatch::filter(function (Record $record) use (&$filterResult): mixed {
return array_shift($filterResult);
});

$this->core->ingest->write(new FakeRecord);
$this->core->ingest->write(new FakeRecord);
$this->core->ingest->write(new FakeRecord);
$this->core->ingest->write(new FakeRecord);
$this->core->ingest->write(new FakeRecord('accepted-record'));
$this->core->digest();

$streams->first()->assertWritten('33:'.Payload::SIGNATURE.':[{"t":"accepted-record"}]');
}

public function test_it_rejects_records_when_exceptions_occurs()
{
$streams = $this->fakeTcpStreams();
Nightwatch::handleUnrecoverableExceptionsUsing(($exceptions = collect())->push(...));

Nightwatch::filter(function (Record $record): mixed {
if (is_numeric($record->t) && ($record->t % 2)) {
throw new RuntimeException("Whoops {$record->t}");
}

return true;
});

$this->core->ingest->write(new FakeRecord('1')); // throw
$this->core->ingest->write(new FakeRecord('2'));
$this->core->ingest->write(new FakeRecord('3')); // throw
$this->core->ingest->write(new FakeRecord('4'));
$this->core->digest();

$streams->first()->assertWritten('29:'.Payload::SIGNATURE.':[{"t":"2"},{"t":"4"}]');
$this->assertCount(2, $exceptions);
$this->assertSame('Whoops 1', $exceptions[0]->getMessage());
$this->assertSame('Whoops 3', $exceptions[1]->getMessage());
}

public function test_it_has_already_resolved_lazy_values()
{
$this->markTestIncomplete('TODO');
}

public function test_it_can_modify_records()
{
$streams = $this->fakeTcpStreams();

Nightwatch::filter(function (Record $record): bool {
if ($record instanceof Query) {
$record->sql = 'sleep 10';
}

return true;
});

DB::select('select * from users');
$this->core->digest();

$streams->first()->assertWritten(function ($value) {
$this->assertStringContainsString('"sql":"sleep 10"', $value);
$this->assertStringNotContainsString('select * from users', $value);

return true;
});
}
}
5 changes: 4 additions & 1 deletion tests/Feature/Sensors/OutgoingRequestSensorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ public function test_it_can_use_guzzle_directly(): void
$stack->setHandler(new CurlHandler);
$stack->push(Nightwatch::guzzleMiddleware());
$client = new Client(['handler' => $stack]);
$client->get('https://laravel.com');
$client->get('https://laravel.com', [
'connection_timeout' => 1,
'timeout' => 1,
]);
Comment on lines +217 to +220
Copy link
Member Author

@timacdonald timacdonald May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is testing guzzle and does a real request. Setting timeouts here so that it fails quickly when there are network issues.

});

$response = $this->post('/users');
Expand Down
Loading