diff --git a/README.md b/README.md index d03ab62..5549032 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ Loop::run(function () { printf("The keyspace %s has a table called %s\n", $row['keyspace_name'], $row['columnfamily_name']); } - yield $cluster->disconnect(); + $session->close(); }); ``` diff --git a/benchmark/read.php b/benchmark/read.php index 3490fcd..c78dce0 100644 --- a/benchmark/read.php +++ b/benchmark/read.php @@ -21,8 +21,8 @@ $session = yield $cluster->connect(); $setup = require __DIR__ . '/shared.php'; - $watcher = Loop::onSignal(SIGTERM, function () use ($cluster) { - yield $cluster->disconnect(); + $watcher = Loop::onSignal(SIGTERM, function () use ($session) { + $session->close(); }); try { @@ -85,7 +85,7 @@ yield $session->query("DROP KEYSPACE IF EXISTS blogs;"); } - yield $cluster->disconnect(); + $session->close(); Loop::cancel($watcher); }); diff --git a/benchmark/write.php b/benchmark/write.php index c4bcf52..b22927e 100644 --- a/benchmark/write.php +++ b/benchmark/write.php @@ -20,8 +20,8 @@ $session = yield $cluster->connect(); $setup = require __DIR__ . '/shared.php'; - $watcher = Loop::onSignal(SIGTERM, function () use ($cluster) { - yield $cluster->disconnect(); + $watcher = Loop::onSignal(SIGTERM, function () use ($session) { + $session->close(); }); try { @@ -63,7 +63,7 @@ yield $session->query("DROP KEYSPACE IF EXISTS blogs;"); } - yield $cluster->disconnect(); + $session->close(); Loop::cancel($watcher); }); diff --git a/examples/basic.php b/examples/basic.php index fcac06e..503d232 100644 --- a/examples/basic.php +++ b/examples/basic.php @@ -22,5 +22,5 @@ \printf("The keyspace %s has a table called %s\n", $row['keyspace_name'], $row['table_name']); } - yield $cluster->disconnect(); + $session->close(); }); diff --git a/src/Cluster.php b/src/Cluster.php index 2279350..1be8912 100644 --- a/src/Cluster.php +++ b/src/Cluster.php @@ -22,8 +22,7 @@ final class Cluster private const STATE_NOT_CONNECTED = 0, STATE_CONNECTING = 1, - STATE_CONNECTED = 2, - STATE_DISCONNECTING = 3 + STATE_CONNECTED = 2 ; /** @@ -42,9 +41,9 @@ final class Cluster private $state = self::STATE_NOT_CONNECTED; /** - * @var Connection + * @var Events */ - private $connection; + private $events; /** * @param Config $config @@ -71,17 +70,32 @@ public static function build(string $dsn): self public function options(): Promise { return call(function () { - if ($this->connection === null) { - $this->connection = yield $this->open(); - } + /** @var Connection $connection */ + $connection = yield $this->open(); /** @var Response\Supported $response */ - $response = yield $this->connection->send(new Request\Options); + $response = yield $connection->send(new Request\Options); + + $connection->close(); return $response->options; }); } + /** + * @return Promise<Events> + */ + public function events(): Promise + { + return call(function () { + if ($this->events) { + return $this->events; + } + + return $this->events = new Events(yield $this->startup()); + }); + } + /** * @param string $keyspace * @@ -96,17 +110,7 @@ public function connect(string $keyspace = null): Promise $this->state = self::STATE_CONNECTING; - if (null === $this->connection) { - $this->connection = yield $this->open(); - } - - $frame = yield $this->connection->send(new Request\Startup($this->config->options())); - - if ($frame instanceof Response\Authenticate) { - yield $this->authenticate(); - } - - $session = new Session($this->connection); + $session = new Session(yield $this->startup()); if ($keyspace !== null) { yield $session->keyspace($keyspace); @@ -119,38 +123,27 @@ public function connect(string $keyspace = null): Promise } /** - * @param int $code - * @param string $reason - * - * @return Promise<void> + * @return Promise<Connection> */ - public function disconnect(int $code = 0, string $reason = ''): Promise + private function startup() { - return call(function() use ($code, $reason) { - if ($this->state === self::STATE_DISCONNECTING) { - return; - } + return call(function () { + /** @var Connection $connection */ + $connection = yield $this->open(); - $this->state = self::STATE_DISCONNECTING; + $request = new Request\Startup($this->config->options()); + $response = yield $connection->send($request); - if ($this->connection !== null) { - $this->connection->close(); + if ($response instanceof Response\Authenticate) { + yield $this->authenticate($connection); } - $this->state = self::STATE_NOT_CONNECTED; + return $connection; }); } /** - * @return bool - */ - public function isConnected(): bool - { - return $this->state === self::STATE_CONNECTED; - } - - /** - * @return Promise + * @return Promise<Connection> */ private function open(): Promise { @@ -178,32 +171,25 @@ private function open(): Promise } /** - * @return Promise + * @param Connection $connection + * + * @return Promise<Response\AuthSuccess> */ - private function authenticate(): Promise + private function authenticate(Connection $connection): Promise { - return call(function () { - $request = new Request\AuthResponse( - $this->config->user(), - $this->config->password() - ); - - /** @var Frame $frame */ - $frame = yield $this->connection->send($request); + return call(function () use ($connection) { + $request = new Request\AuthResponse($this->config->user(), $this->config->password()); + $response = yield $connection->send($request); switch (true) { - case $frame instanceof Response\AuthChallenge: - // TODO - - break; - case $frame instanceof Response\AuthSuccess: - break; + case $response instanceof Response\AuthSuccess: + return $response; default: - throw Exception\ServerException::unexpectedFrame($frame->opcode); + throw Exception\ServerException::unexpectedFrame($response->opcode); } }); } - + /** * @return Compressor */ diff --git a/src/Compressor/LzCompressor.php b/src/Compressor/LzCompressor.php index c65e46b..65a9942 100644 --- a/src/Compressor/LzCompressor.php +++ b/src/Compressor/LzCompressor.php @@ -21,6 +21,7 @@ class LzCompressor implements Compressor */ public function compress(string $data): string { + /** @noinspection PhpUndefinedFunctionInspection */ return lz4_compress($data); } @@ -29,6 +30,7 @@ public function compress(string $data): string */ public function decompress(string $binary): string { + /** @noinspection PhpUndefinedFunctionInspection */ return lz4_uncompress($binary); } } diff --git a/src/Compressor/SnappyCompressor.php b/src/Compressor/SnappyCompressor.php index 16f6f3d..adee6d2 100644 --- a/src/Compressor/SnappyCompressor.php +++ b/src/Compressor/SnappyCompressor.php @@ -21,6 +21,7 @@ class SnappyCompressor implements Compressor */ public function compress(string $data): string { + /** @noinspection PhpUndefinedFunctionInspection */ return snappy_compress($data); } @@ -29,6 +30,7 @@ public function compress(string $data): string */ public function decompress(string $binary): string { + /** @noinspection PhpUndefinedFunctionInspection */ return snappy_uncompress($binary); } } diff --git a/src/Connection.php b/src/Connection.php index 7657dc5..fed0b8a 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -13,12 +13,12 @@ namespace PHPinnacle\Cassis; use function Amp\asyncCall, Amp\call; -use Amp\Socket\ClientTlsContext; use function Amp\Socket\connect, Amp\Socket\cryptoConnect; use Amp\Deferred; use Amp\Loop; use Amp\Promise; use Amp\Socket\ClientConnectContext; +use Amp\Socket\ClientTlsContext; use Amp\Socket\Socket; use Amp\Uri\Uri; @@ -46,11 +46,6 @@ final class Connection */ private $parser; - /** - * @var EventEmitter - */ - private $emitter; - /** * @var \SplQueue */ @@ -67,9 +62,9 @@ final class Connection private $processing = false; /** - * @var Deferred[] + * @var callable[] */ - private $defers = []; + private $callbacks = []; /** * @var int @@ -87,21 +82,9 @@ public function __construct(Uri $uri, Streams $streams, Compressor $compressor) $this->streams = $streams; $this->packer = new Packer($compressor); $this->parser = new Parser($compressor); - $this->emitter = new EventEmitter($this); $this->queue = new \SplQueue; } - /** - * @param string $event - * @param callable $listener - * - * @return Promise - */ - public function register(string $event, callable $listener): Promise - { - return $this->emitter->register($event, $listener); - } - /** * @noinspection PhpDocMissingThrowsInspection * @@ -114,7 +97,16 @@ public function send(Request $request): Promise $stream = $this->streams->reserve(); $deferred = new Deferred; - $this->defers[$stream] = $deferred; + $this->subscribe($stream, function (Frame $frame) use ($deferred) { + if ($frame->opcode === Frame::OPCODE_ERROR) { + /** @var Response\Error $frame */ + $deferred->fail($frame->exception); + } else { + $deferred->resolve($frame); + } + + $this->streams->release($frame->stream); + }); $this->queue->enqueue($this->packer->pack($request, $stream)); @@ -129,12 +121,23 @@ public function send(Request $request): Promise return $deferred->promise(); } + /** + * @param int $stream + * @param callable $handler + * + * @return void + */ + public function subscribe(int $stream, callable $handler): void + { + $this->callbacks[$stream] = $handler; + } + /** * @param int $timeout * @param int $attempts * @param bool $noDelay * - * @return Promise + * @return Promise<self> */ public function open(int $timeout, int $attempts, bool $noDelay): Promise { @@ -164,6 +167,8 @@ public function open(int $timeout, int $attempts, bool $noDelay): Promise } $this->listen(); + + return $this; }); } @@ -176,7 +181,7 @@ public function close(): void $this->socket->close(); } - $this->defers = []; + $this->callbacks = []; } /** @@ -185,15 +190,15 @@ public function close(): void private function write(): void { asyncCall(function () { - $processed = 0; + $done = 0; $data = ''; while ($this->queue->isEmpty() === false) { $data .= $this->queue->dequeue(); - ++$processed; + ++$done; - if ($processed % self::WRITE_ROUNDS === 0) { + if ($done % self::WRITE_ROUNDS === 0) { Loop::defer(function () { $this->write(); }); @@ -204,8 +209,7 @@ private function write(): void yield $this->socket->write($data); - $this->lastWrite = Loop::now(); - + $this->lastWrite = Loop::now(); $this->processing = false; }); } @@ -220,28 +224,13 @@ private function listen(): void $this->parser->append($chunk); while ($frame = $this->parser->parse()) { - if ($frame->opcode === Frame::OPCODE_EVENT) { - /** @var Response\Event $frame */ - $this->emitter->emit($frame); - - continue 2; - } - - if (!isset($this->defers[$frame->stream])) { + if (!isset($this->callbacks[$frame->stream])) { continue 2; } - $deferred = $this->defers[$frame->stream]; - unset($this->defers[$frame->stream]); + asyncCall($this->callbacks[$frame->stream], $frame); - $this->streams->release($frame->stream); - - if ($frame->opcode === Frame::OPCODE_ERROR) { - /** @var Response\Error $frame */ - $deferred->fail($frame->exception); - } else { - $deferred->resolve($frame); - } + unset($this->callbacks[$frame->stream]); } } diff --git a/src/EventEmitter.php b/src/Events.php similarity index 51% rename from src/EventEmitter.php rename to src/Events.php index 2988eaf..2ff2a7b 100644 --- a/src/EventEmitter.php +++ b/src/Events.php @@ -16,7 +16,7 @@ use function Amp\call; use Amp\Promise; -final class EventEmitter +final class Events { /** * @var Connection @@ -34,6 +34,50 @@ final class EventEmitter public function __construct(Connection $connection) { $this->connection = $connection; + + $this->connection->subscribe(-1, function (Frame $frame) { + if ($frame->opcode !== Frame::OPCODE_EVENT) { + return; + } + + /** @var Response\Event $frame */ + $event = $frame->event; + $listeners = $this->listeners[$event->type()] ?? []; + + foreach ($listeners as $listener) { + asyncCall($listener, $event); + } + }); + } + + /** + * @param callable $listener + * + * @return Promise + */ + public function onSchemaChange(callable $listener): Promise + { + return $this->register(Event::SCHEMA_CHANGE, $listener); + } + + /** + * @param callable $listener + * + * @return Promise + */ + public function onStatusChange(callable $listener): Promise + { + return $this->register(Event::STATUS_CHANGE, $listener); + } + + /** + * @param callable $listener + * + * @return Promise + */ + public function onTopologyChange(callable $listener): Promise + { + return $this->register(Event::TOPOLOGY_CHANGE, $listener); } /** @@ -42,7 +86,7 @@ public function __construct(Connection $connection) * * @return Promise */ - public function register(string $event, callable $listener): Promise + private function register(string $event, callable $listener): Promise { return call(function() use ($event, $listener) { if (!isset($this->listeners[$event])) { @@ -54,44 +98,10 @@ public function register(string $event, callable $listener): Promise } /** - * @param Frame $frame + * Clear resources */ - public function emit(Frame $frame): void + public function __destruct() { - asyncCall(function () use ($frame) { - $event = null; - - switch (true) { - case $frame instanceof Response\TopologyChange: - $event = new Event\TopologyChange($frame->change, \inet_ntop($frame->address)); - - break; - case $frame instanceof Response\StatusChange: - $event = new Event\StatusChange($frame->change, \inet_ntop($frame->address)); - - break; - case $frame instanceof Response\SchemaChange: - $event = new Event\SchemaChange( - $frame->change, - $frame->target, - $frame->keyspace, - $frame->name, - $frame->arguments - ); - - break; - } - - if ($event === null) { - return; - } - - $class = \get_class($event); - $listeners = $this->listeners[$class] ?? []; - - foreach ($listeners as $listener) { - asyncCall($listener, $event); - } - }); + $this->connection->close(); } } diff --git a/src/Parser.php b/src/Parser.php index 4342c56..d79d1ba 100644 --- a/src/Parser.php +++ b/src/Parser.php @@ -81,7 +81,7 @@ public function parse(): ?Frame $type = $this->readBuffer->readByte(0); $flags = $this->readBuffer->readByte(1); - $stream = $this->readBuffer->readShort(2); + $stream = $this->readBuffer->readSmallInt(2); $opcode = $this->readBuffer->readByte(4); $length = $this->readBuffer->readInt(5); diff --git a/src/Result/Rows.php b/src/Result/Rows.php index 8dc3e8c..679b3e0 100644 --- a/src/Result/Rows.php +++ b/src/Result/Rows.php @@ -23,7 +23,7 @@ final class Rows implements Result, \Iterator, \Countable, \ArrayAccess /** * @var \SplFixedArray */ - public $data; + private $data; /** * @var Metadata @@ -34,7 +34,7 @@ final class Rows implements Result, \Iterator, \Countable, \ArrayAccess * @param \SplFixedArray $data * @param Metadata $meta */ - public function __construct(\SplFixedArray $data, Metadata $meta) + private function __construct(\SplFixedArray $data, Metadata $meta) { $this->data = $data; $this->meta = $meta; @@ -146,14 +146,6 @@ public function offsetUnset($offset) throw new \BadMethodCallException('Rows result are immutable.'); } - /** - * @return Metadata - */ - public function meta(): Metadata - { - return $this->meta; - } - /** * @return string|null */ diff --git a/src/Session.php b/src/Session.php index a5ee9fb..e248d76 100644 --- a/src/Session.php +++ b/src/Session.php @@ -53,17 +53,6 @@ public function keyspace(string $keyspace): Promise }); } - /** - * @param string $event - * @param callable $listener - * - * @return Promise - */ - public function register(string $event, callable $listener): Promise - { - return $this->connection->register($event, $listener); - } - /** * @param string $cql * @param array $values @@ -104,6 +93,7 @@ public function prepare(string $cql): Promise public function execute(Statement $statement, Context $context = null): Promise { return call(function () use ($statement, $context) { + /** @var Response\Result $response */ $request = $this->request($statement, $context ?: new Context); $response = yield $this->connection->send($request); @@ -141,4 +131,12 @@ private function request(Statement $statement, Context $context): Request throw new Exception\ClientException; } } + + /** + * @return void + */ + public function close(): void + { + $this->connection->close(); + } } diff --git a/src/Streams.php b/src/Streams.php index 549a1a0..5d6550e 100644 --- a/src/Streams.php +++ b/src/Streams.php @@ -32,11 +32,11 @@ final class Streams private $stack; /** - * Closed constructor + * @param \SplStack $stack */ - private function __construct() + private function __construct(\SplStack $stack) { - $this->stack = new \SplStack; + $this->stack = $stack; } /** @@ -44,7 +44,7 @@ private function __construct() */ public static function instance(): self { - return self::$instance ?: self::$instance = new self; + return self::$instance ?: self::$instance = new self(new \SplStack); } /** diff --git a/src/Type/Custom.php b/src/Type/Custom.php index d40e6f3..59a020d 100644 --- a/src/Type/Custom.php +++ b/src/Type/Custom.php @@ -12,7 +12,6 @@ use PHPinnacle\Cassis\Buffer; use PHPinnacle\Cassis\Type; -use PHPinnacle\Cassis\Value; final class Custom implements Type { @@ -40,7 +39,7 @@ public function class(): string /** * {@inheritdoc} */ - public function read(Buffer $buffer): Value + public function read(Buffer $buffer) { // TODO } diff --git a/src/Value/Decimal.php b/src/Value/Decimal.php index bc360a0..8513c47 100644 --- a/src/Value/Decimal.php +++ b/src/Value/Decimal.php @@ -8,6 +8,10 @@ * file that was distributed with this source code. */ +/** + * @noinspection PhpComposerExtensionStubsInspection + */ + declare(strict_types = 1); namespace PHPinnacle\Cassis\Value; @@ -45,7 +49,7 @@ public function __construct(\GMP $value, int $scale) */ public static function fromString(string $value, int $scale): self { - return new self(bigint_init($value), $scale); + return new self(\bigint_init($value), $scale); } /** @@ -56,7 +60,7 @@ public static function fromString(string $value, int $scale): self */ public static function fromBytes(string $bytes, int $scale): self { - return new self(bigint_import($bytes), $scale); + return new self(\bigint_import($bytes), $scale); } /** @@ -64,7 +68,7 @@ public static function fromBytes(string $bytes, int $scale): self */ public function value(): string { - return \gmp_strval($this->value); + return \bigint_strval($this->value); } /** @@ -80,7 +84,7 @@ public function scale(): int */ public function write(Buffer $buffer): void { - $binary = \gmp_export($this->value); + $binary = \bigint_export($this->value); $buffer ->appendInt(4 + \strlen($binary)) @@ -95,7 +99,7 @@ public function write(Buffer $buffer): void public function __toString(): string { $sign = ''; - $value = \gmp_strval($this->value); + $value = \bigint_strval($this->value); $length = \strlen($value); if ($value[0] === '-') { diff --git a/src/Value/Varint.php b/src/Value/Varint.php index 0731749..f1a0924 100644 --- a/src/Value/Varint.php +++ b/src/Value/Varint.php @@ -8,6 +8,10 @@ * file that was distributed with this source code. */ +/** + * @noinspection PhpComposerExtensionStubsInspection + */ + declare(strict_types = 1); namespace PHPinnacle\Cassis\Value; @@ -37,7 +41,7 @@ public function __construct(\GMP $value) */ public static function fromString(string $value): self { - return new self(bigint_init($value)); + return new self(\bigint_init($value)); } /** @@ -47,7 +51,7 @@ public static function fromString(string $value): self */ public static function fromBytes(string $bytes): self { - return new self(bigint_import($bytes)); + return new self(\bigint_import($bytes)); } /** @@ -55,7 +59,7 @@ public static function fromBytes(string $bytes): self */ public function write(Buffer $buffer): void { - $binary = gmp_export($this->value); + $binary = \bigint_export($this->value); $buffer ->appendInt(\strlen($binary)) @@ -68,6 +72,6 @@ public function write(Buffer $buffer): void */ public function __toString(): string { - return \gmp_strval($this->value); + return \bigint_strval($this->value); } } diff --git a/src/functions.php b/src/functions.php index 659711f..44a4be6 100644 --- a/src/functions.php +++ b/src/functions.php @@ -1,4 +1,16 @@ <?php +/** + * This file is part of PHPinnacle/Cassis. + * + * (c) PHPinnacle Team <dev@phpinnacle.com> + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +/** + * @noinspection PhpComposerExtensionStubsInspection + */ if (!\function_exists('is_assoc')) { function is_assoc(array $values): bool @@ -12,6 +24,7 @@ function bigint_init(string $value): \GMP { \error_clear_last(); + /** @var \GMP $gmp */ $gmp = @\gmp_init($value); if ($error = \error_get_last()) { @@ -22,6 +35,13 @@ function bigint_init(string $value): \GMP } } +if (!\function_exists('bigint_strval')) { + function bigint_strval(\GMP $gmp): string + { + return \gmp_strval($gmp); + } +} + if (!\function_exists('bigint_import')) { function bigint_import(string $bytes): \GMP { @@ -36,3 +56,10 @@ function bigint_import(string $bytes): \GMP return $gmp; } } + +if (!\function_exists('bigint_export')) { + function bigint_export(\GMP $gmp): string + { + return \gmp_export($gmp); + } +} diff --git a/tests/ClusterTest.php b/tests/ClusterTest.php index c861e27..e2d2727 100644 --- a/tests/ClusterTest.php +++ b/tests/ClusterTest.php @@ -30,7 +30,7 @@ public static function setUpBeforeClass(): void WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };" ); - yield $cluster->disconnect(); + $session->close(); }); } @@ -55,8 +55,6 @@ public function testOptions() foreach ($options as $option => $values) { self::assertIsArray($values); } - - yield $cluster->disconnect(); } public function testConnect() @@ -65,19 +63,19 @@ public function testConnect() $promise = $cluster->connect(); self::assertPromise($promise); - self::assertFalse($cluster->isConnected()); + /** @var Session $session */ $session = yield $promise; self::assertInstanceOf(Session::class, $session); - self::assertTrue($cluster->isConnected()); - yield $cluster->disconnect(); + $session->close(); } public function testConnectWithKeyspace() { $cluster = self::cluster(); + /** @var Session $session */ $session = yield $cluster->connect('simplex'); $ref = new \ReflectionProperty(Session::class, 'keyspace'); @@ -85,33 +83,7 @@ public function testConnectWithKeyspace() self::assertSame('simplex', $ref->getValue($session)); - yield $cluster->disconnect(); - } - - public function testDisconnectTwice() - { - $cluster = self::cluster(); - - yield $cluster->connect(); - - yield $cluster->disconnect(); - yield $cluster->disconnect(); - - self::assertFalse($cluster->isConnected()); - } - - public function testConnectTwice() - { - $this->expectException(ClientException::class); - - try { - $cluster = self::cluster(); - - yield $cluster->connect(); - yield $cluster->connect(); - } finally { - yield $cluster->disconnect(); - } + $session->close(); } public function testConnectWithUnknownKeyspace() @@ -127,7 +99,7 @@ public function testConnectFailure() { $this->expectException(ClientException::class); - $cluster = cluster::build('tcp://localhost:19042'); + $cluster = Cluster::build('tcp://localhost:19042'); yield $cluster->connect(); } @@ -141,7 +113,7 @@ public static function tearDownAfterClass(): void yield $session->query("DROP KEYSPACE IF EXISTS simplex;"); - yield $cluster->disconnect(); + $session->close(); }); } } diff --git a/tests/CollectionsTest.php b/tests/CollectionsTest.php index 3be7743..5af8d67 100644 --- a/tests/CollectionsTest.php +++ b/tests/CollectionsTest.php @@ -37,7 +37,7 @@ public static function setUpBeforeClass(): void yield $session->query(\trim($query)); } - yield $cluster->disconnect(); + $session->close(); }); } @@ -75,7 +75,7 @@ public function testDataTypes() self::assertEquals($map->values(), $result['map_value']->values()); self::assertEquals($map->keys(), $result['map_value']->keys()); - yield $cluster->disconnect(); + $session->close(); } public static function tearDownAfterClass(): void @@ -87,7 +87,7 @@ public static function tearDownAfterClass(): void yield $session->query("DROP KEYSPACE simplex;"); - yield $cluster->disconnect(); + $session->close(); }); } } diff --git a/tests/EventsTest.php b/tests/EventsTest.php new file mode 100644 index 0000000..90dde35 --- /dev/null +++ b/tests/EventsTest.php @@ -0,0 +1,72 @@ +<?php +/** + * This file is part of PHPinnacle/Cassis. + * + * (c) PHPinnacle Team <dev@phpinnacle.com> + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace PHPinnacle\Cassis\Tests; + +use Amp\Loop; +use PHPinnacle\Cassis\Event; +use PHPinnacle\Cassis\Events; +use PHPinnacle\Cassis\Session; + +class EventsTest extends AsyncTest +{ + public static function setUpBeforeClass(): void + { + Loop::run(function () { + $cluster = self::cluster(); + /** @var Session $session */ + $session = yield $cluster->connect(); + + yield $session->query( + "CREATE KEYSPACE IF NOT EXISTS simplex + WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };" + ); + + $session->close(); + }); + } + + public function testRegisterEvent() + { + $cluster = self::cluster(); + /** @var Session $session */ + $session = yield $cluster->connect('simplex'); + + yield $session->query( + 'CREATE TABLE simple (id int PRIMARY KEY, enabled boolean);' + ); + + /** @var Events $events */ + $events = yield $cluster->events(); + + yield $events->onSchemaChange(function (Event\SchemaChange $event) { + self::assertEquals('UPDATED', $event->change()); + self::assertEquals('TABLE', $event->target()); + self::assertEquals('simplex', $event->keyspace()); + self::assertEquals('simple', $event->name()); + }); + + yield $session->query('ALTER TABLE simple ADD name text;'); + + $session->close(); + } + + public static function tearDownAfterClass(): void + { + Loop::run(function () { + $cluster = self::cluster(); + /** @var Session $session */ + $session = yield $cluster->connect(); + + yield $session->query("DROP KEYSPACE IF EXISTS simplex;"); + $session->close(); + }); + } +} diff --git a/tests/SessionTest.php b/tests/SessionTest.php index 3066ca2..e2578cf 100644 --- a/tests/SessionTest.php +++ b/tests/SessionTest.php @@ -30,7 +30,7 @@ public static function setUpBeforeClass(): void WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };" ); - yield $cluster->disconnect(); + $session->close(); }); } @@ -49,7 +49,7 @@ public function testChangeKeyspace() yield $session->query("DROP KEYSPACE simplex_2;"); - yield $cluster->disconnect(); + $session->close(); } public function testSimpleQueries() @@ -82,7 +82,7 @@ public function testSimpleQueries() self::assertEquals(3, $rows[2]['id']); self::assertEquals(true, $rows[2]['enabled']); - yield $cluster->disconnect(); + $session->close(); } public function testPaging() @@ -125,7 +125,7 @@ public function testPaging() self::assertEquals(5, $rows2[1]['ordering']); self::assertNull($rows2->cursor()); - yield $cluster->disconnect(); + $session->close(); } public function testPreparedQueries() @@ -154,7 +154,7 @@ public function testPreparedQueries() self::assertEquals(2, $rows[1]['id']); self::assertEquals(false, $rows[1]['enabled']); - yield $cluster->disconnect(); + $session->close(); } // // public function testBatchQueries() @@ -186,27 +186,6 @@ public function testPreparedQueries() // // yield $cluster->disconnect(); // }); -// } -// -// public function testRegisterEvent() -// { -// self::loop(function () { -// $cluster = self::cluster(); -// /** @var Session $session */ -// $session = yield $cluster->connect('simplex'); -// -// yield $session->query( -// "CREATE TABLE simple (id int PRIMARY KEY, enabled boolean);" -// ); -// -// yield $session->register(Event::SCHEMA_CHANGE, function (Event $event) use ($cluster) { -// var_dump($event); -// }); -// -// yield $session->query('ALTER TABLE simple ADD name text;'); -// -// yield $cluster->disconnect(); -// }); // } public static function tearDownAfterClass(): void @@ -218,7 +197,7 @@ public static function tearDownAfterClass(): void yield $session->query("DROP KEYSPACE IF EXISTS simplex;"); - yield $cluster->disconnect(); + $session->close(); }); } } diff --git a/tests/StreamsTest.php b/tests/StreamsTest.php index 510a3ed..eacf3ee 100644 --- a/tests/StreamsTest.php +++ b/tests/StreamsTest.php @@ -14,6 +14,14 @@ class StreamsTest extends CassisTest { + public function setUp(): void + { + $prop = new \ReflectionProperty(Streams::class, 'instance'); + $prop->setAccessible(true); + $prop->setValue(Streams::class, null); + $prop->setAccessible(false); + } + public function testCreate() { $streams1 = Streams::instance(); diff --git a/tests/TypesTest.php b/tests/TypesTest.php index 74294f1..fcb0eeb 100644 --- a/tests/TypesTest.php +++ b/tests/TypesTest.php @@ -59,7 +59,7 @@ public static function setUpBeforeClass(): void yield $session->query(\trim($query)); } - yield $cluster->disconnect(); + $session->close(); }); } @@ -116,7 +116,7 @@ public function testDataTypes() self::assertEquals('200.199.198.197', (string) $result['inet_value']); self::assertEquals('0x3078303030303030', $result['blob_value']->bytes()); - yield $cluster->disconnect(); + $session->close(); } public static function tearDownAfterClass(): void @@ -128,7 +128,7 @@ public static function tearDownAfterClass(): void yield $session->query("DROP KEYSPACE simplex;"); - yield $cluster->disconnect(); + $session->close(); }); } } diff --git a/tests/UserTypesTest.php b/tests/UserTypesTest.php index 71e69de..4f36e5c 100644 --- a/tests/UserTypesTest.php +++ b/tests/UserTypesTest.php @@ -43,7 +43,7 @@ public static function setUpBeforeClass(): void yield $session->query(\trim($query)); } - yield $cluster->disconnect(); + $session->close(); }); } @@ -82,7 +82,7 @@ public function testDataTypes() self::assertArrayHasKey('comment', $result); self::assertEquals($comment, $result['comment']); - yield $cluster->disconnect(); + $session->close(); } public static function tearDownAfterClass(): void @@ -94,7 +94,7 @@ public static function tearDownAfterClass(): void yield $session->query("DROP KEYSPACE simplex;"); - yield $cluster->disconnect(); + $session->close(); }); } }