Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix amphp problems #16

Merged
merged 3 commits into from
Sep 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions examples/demo-subscription5.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Amp\Success;
use Prooph\EventStoreClient\Exception\InvalidOperationException;
use Prooph\EventStoreClient\Internal\AbstractEventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\Messages\ClientMessages\CreatePersistentSubscription;
use Throwable;
Expand Down Expand Up @@ -60,7 +59,7 @@

\var_dump($result);

$subscription = $connection->connectToPersistentSubscription(
$connection->connectToPersistentSubscriptionAsync(
'foo-bar',
'test-persistent-subscription',
new class() implements EventAppearedOnPersistentSubscription {
Expand Down Expand Up @@ -92,7 +91,4 @@ public function __invoke(
true,
new UserCredentials('admin', 'changeit')
);

/** @var EventStorePersistentSubscription $subscription */
$subscription = yield $subscription->start();
});
94 changes: 0 additions & 94 deletions examples/demo-subscription6.php

This file was deleted.

5 changes: 4 additions & 1 deletion src/ClientOperations/AbstractSubscriptionOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace Prooph\EventStoreClient\ClientOperations;

use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use Generator;
Expand Down Expand Up @@ -371,7 +372,9 @@ private function executeActionAsync(callable $action): void
$this->dropSubscription(SubscriptionDropReason::userInitiated(), new \Exception('client buffer too big'));
}

Promise\rethrow($this->executeActions());
Loop::defer(function (): Generator {
yield $this->executeActions();
});
}

/** @return Promise<void> */
Expand Down
11 changes: 0 additions & 11 deletions src/EventStoreAsyncConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use Amp\Promise;
use Prooph\EventStoreClient\Internal\EventStoreAllCatchUpSubscription;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\EventStoreStreamCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ListenerHandler;

Expand Down Expand Up @@ -197,16 +196,6 @@ public function subscribeToAllFrom(
?UserCredentials $userCredentials = null
): EventStoreAllCatchUpSubscription;

public function connectToPersistentSubscription(
string $stream,
string $groupName,
EventAppearedOnPersistentSubscription $eventAppeared,
?PersistentSubscriptionDropped $subscriptionDropped = null,
int $bufferSize = 10,
bool $autoAck = true,
?UserCredentials $userCredentials = null
): EventStorePersistentSubscription;

/**
* @return Promise<AbstractEventStorePersistentSubscription>
*/
Expand Down
6 changes: 1 addition & 5 deletions src/Internal/AbstractEventStorePersistentSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,7 @@ private function enqueue(PersistentSubscriptionResolvedEvent $resolvedEvent): vo
$this->isProcessing = true;

Loop::defer(function (): Generator {
$promise = $this->processQueue();

Promise\rethrow($promise);

yield $promise;
yield $this->processQueue();
});
}
}
Expand Down
40 changes: 2 additions & 38 deletions src/Internal/EventStoreAsyncNodeConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ public function subscribeToStreamFrom(
$settings
);

Promise\rethrow($catchUpSubscription->startAsync());
$catchUpSubscription->startAsync();

return $catchUpSubscription;
}
Expand Down Expand Up @@ -671,47 +671,11 @@ public function subscribeToAllFrom(
$settings
);

Promise\rethrow($catchUpSubscription->startAsync());
$catchUpSubscription->startAsync();

return $catchUpSubscription;
}

public function connectToPersistentSubscription(
string $stream,
string $groupName,
EventAppearedOnPersistentSubscription $eventAppeared,
?PersistentSubscriptionDropped $subscriptionDropped = null,
int $bufferSize = 10,
bool $autoAck = true,
?UserCredentials $userCredentials = null
): EventStorePersistentSubscription {
if (empty($stream)) {
throw new InvalidArgumentException('Stream cannot be empty');
}

if (empty($groupName)) {
throw new InvalidArgumentException('Group cannot be empty');
}

$subscription = new EventStorePersistentSubscription(
$groupName,
$stream,
$eventAppeared,
$subscriptionDropped,
$userCredentials,
$this->settings->log(),
$this->settings->verboseLogging(),
$this->settings,
$this->handler,
$bufferSize,
$autoAck
);

Promise\rethrow($subscription->start());

return $subscription;
}

/** {@inheritdoc} */
public function connectToPersistentSubscriptionAsync(
string $stream,
Expand Down
10 changes: 2 additions & 8 deletions src/Internal/EventStoreCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,7 @@ private function onReconnect(ClientConnectionEventArgs $clientConnectionEventArg
$this->connection->detach($this->connectListener);

Loop::defer(function (): Generator {
$promise = $this->runSubscriptionAsync();
Promise\rethrow($promise);

yield $promise;
yield $this->runSubscriptionAsync();
});
}

Expand Down Expand Up @@ -462,10 +459,7 @@ private function ensureProcessingPushQueue(): void
$this->isProcessing = true;

Loop::defer(function (): Generator {
$promise = $this->processLiveQueueAsync();
Promise\rethrow($promise);

yield $promise;
yield $this->processLiveQueueAsync();
});
}
}
Expand Down
8 changes: 1 addition & 7 deletions src/Internal/EventStoreConnectionLogicHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
use Prooph\EventStoreClient\SystemData\TcpPackage;
use Prooph\EventStoreClient\Transport\Tcp\TcpPackageConnection;
use Throwable;
use function Amp\Promise\rethrow;

/** @internal */
class EventStoreConnectionLogicHandler
Expand Down Expand Up @@ -250,8 +249,6 @@ private function discoverEndPoint(?Deferred $deferred): void
$deferred->resolve(null);
}
});

rethrow($promise);
}

/** @throws \Exception */
Expand Down Expand Up @@ -334,10 +331,7 @@ function (TcpPackageConnection $connection, Throwable $exception): void {
);

Loop::defer(function (): Generator {
$promise = $this->connection->connectAsync();
rethrow($promise);

yield $promise;
yield $this->connection->connectAsync();

if (! $this->connection->isClosed()) {
$this->connection->startReceiving();
Expand Down
23 changes: 0 additions & 23 deletions src/Internal/EventStoreSyncNodeConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use Prooph\EventStoreClient\ClusterSettings;
use Prooph\EventStoreClient\ConditionalWriteResult;
use Prooph\EventStoreClient\ConnectionSettings;
use Prooph\EventStoreClient\EventAppearedOnSubscription;
use Prooph\EventStoreClient\EventReadResult;
use Prooph\EventStoreClient\EventStoreAsyncConnection;
use Prooph\EventStoreClient\EventStoreSyncConnection;
Expand All @@ -31,7 +30,6 @@
use Prooph\EventStoreClient\StreamEventsSlice;
use Prooph\EventStoreClient\StreamMetadata;
use Prooph\EventStoreClient\StreamMetadataResult;
use Prooph\EventStoreClient\SubscriptionDropped;
use Prooph\EventStoreClient\SystemSettings;
use Prooph\EventStoreClient\UserCredentials;
use Prooph\EventStoreClient\WriteResult;
Expand Down Expand Up @@ -273,27 +271,6 @@ public function deletePersistentSubscription(
return Promise\wait($this->asyncConnection->deletePersistentSubscriptionAsync($stream, $groupName, $userCredentials));
}

/** @throws Throwable */
public function connectToPersistentSubscription(
string $stream,
string $groupName,
EventAppearedOnSubscription $eventAppeared,
?SubscriptionDropped $subscriptionDropped = null,
int $bufferSize = 10,
bool $autoAck = true,
?UserCredentials $userCredentials = null
): EventStorePersistentSubscription {
return $this->asyncConnection->connectToPersistentSubscription(
$stream,
$groupName,
$eventAppeared,
$subscriptionDropped,
$bufferSize,
$autoAck,
$userCredentials
);
}

public function startTransaction(
string $stream,
int $expectedVersion,
Expand Down
5 changes: 2 additions & 3 deletions tests/SpecificationWithConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected function given(): Generator
abstract protected function when();

/** @throws Throwable */
protected function executeCallback(callable $test): void
protected function execute(callable $test): void
{
wait(call(function () use ($test) {
$this->conn = TestConnection::createAsync();
Expand All @@ -52,8 +52,7 @@ protected function executeCallback(callable $test): void

protected function end(): Generator
{
// @todo is this a bug?
//$this->conn->close();
$this->conn->close();

yield new Success();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected function given(): Generator
$this->resetEvent->resolve(true);
};

$this->conn->connectToPersistentSubscription(
yield $this->conn->connectToPersistentSubscriptionAsync(
$this->stream,
$this->group,
new class() implements EventAppearedOnPersistentSubscription {
Expand Down Expand Up @@ -129,7 +129,7 @@ protected function when(): Generator
*/
public function the_subscription_gets_dropped(): void
{
$this->executeCallback(function (): Generator {
$this->execute(function (): Generator {
try {
$result = yield Promise\timeout($this->resetEvent->promise(), 5000);
} catch (TimeoutException $e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected function when(): Generator
*/
public function the_completion_succeeds(): void
{
$this->executeCallback(function () {
$this->execute(function () {
yield $this->conn->createPersistentSubscriptionAsync(
'someother' . $this->stream,
'group3211',
Expand Down
Loading