Skip to content

Commit

Permalink
fix: remove futureResponses for now
Browse files Browse the repository at this point in the history
  • Loading branch information
rishtigupta committed Jan 3, 2024
1 parent 31e7d62 commit cd19529
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 35 deletions.
12 changes: 6 additions & 6 deletions examples/topic-example.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ function printBanner(string $message, LoggerInterface $logger): void


// Delete test cache
$logger->info("Deleting cache $CACHE_NAME\n");
$response = $client->deleteCache($CACHE_NAME);
if ($response->asError()) {
$logger->info("Error deleting cache: " . $response->asError()->message() . "\n");
exit(1);
}
//$logger->info("Deleting cache $CACHE_NAME\n");
//$response = $client->deleteCache($CACHE_NAME);
//if ($response->asError()) {
// $logger->info("Error deleting cache: " . $response->asError()->message() . "\n");
// exit(1);
//}

printBanner("* Momento Example End *", $logger);
83 changes: 55 additions & 28 deletions src/Topic/Internal/ScsTopicClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,51 +73,78 @@ private function returnCollectionTtl(?CollectionTtl $ttl): CollectionTtl
private function processCall(UnaryCall $call)
{
[$response, $status] = $call->wait();
$this->logger->debug("response: . json_encode($response)\n");
if ($status->code !== 0) {
$this->logger->debug("Data client error: {$status->details}");
$this->logger->debug("Topic client error: {$status->details}");
throw _ErrorConverter::convert($status->code, $status->details, $call->getMetadata());
}
return $response;
}


/**
* @return ResponseFuture<TopicPublishResponse>
*/
public function publish(string $cacheName, string $topicName, string $value): ResponseFuture
public function publish(string $cacheName, string $topicName, string $value): TopicPublishResponse
{
$this->logger->info("Publishing to topic: $topicName in cache $cacheName\n");
$topicValue = new _TopicValue();
$topicValue->setText($value);
try {
validateCacheName($cacheName);
$publishRequest = new _PublishRequest();
$publishRequest->setCacheName($cacheName);
$publishRequest->setTopic($topicName);
$publishRequest->setValue($topicValue);

$call = $this->grpcManager->client->Publish($publishRequest,);
$request = new _PublishRequest();
$request->setCacheName($cacheName);
$request->setTopic($topicName);
$request->setValue($topicValue);
$call = $this->grpcManager->client->Publish($request);
$this->processCall($call);
} catch (SdkError $e) {
$this->logger->debug("Failed to publish message to topic $topicName in cache $cacheName: {$e->getMessage()}");
return ResponseFuture::createResolved(new TopicPublishResponseError($e));
} catch (Exception $e) {
return new TopicPublishResponseError($e);
} catch (\Exception $e) {
$this->logger->debug("Failed to publish message to topic $topicName in cache $cacheName: {$e->getMessage()}");
return ResponseFuture::createResolved(new TopicPublishResponseError(new UnknownError($e->getMessage())));
return new TopicPublishResponseError(new UnknownError($e->getMessage()));
}
return new TopicPublishResponseSuccess();
}

return ResponseFuture::createPending(
function () use ($call): TopicPublishResponse {
try {
$this->processCall($call);
} catch (SdkError $e) {
return new TopicPublishResponseError($e);
} catch (Exception $e) {
return new TopicPublishResponseError(new UnknownError($e->getMessage()));
}

return new TopicPublishResponseSuccess();
}
);
}
// /**
// * @return TopicPublishResponse
// */
// public function publish(string $cacheName, string $topicName, string $value): TopicPublishResponse
// {
// $topicValue = new _TopicValue();
// $topicValue->setText($value);
// try {
// $this->logger->info("Publishing to topic: $topicName in cache $cacheName\n");
// validateCacheName($cacheName);
// $publishRequest = new _PublishRequest();
// $publishRequest->setCacheName($cacheName);
// $publishRequest->setTopic($topicName);
// $publishRequest->setValue($topicValue);
// $this->logger->debug("publishRequest: . json_encode($publishRequest)\n");
//
// $call = $this->grpcManager->client->Publish($publishRequest);
// this->logger->debug("call: . json_encode($call)\n");
// } catch (SdkError $e) {
// $this->logger->debug("Failed to publish message to topic $topicName in cache $cacheName: {$e->getMessage()}");
// return ResponseFuture::createResolved(new TopicPublishResponseError($e));
// } catch (Exception $e) {
// $this->logger->debug("Failed to publish message to topic $topicName in cache $cacheName: {$e->getMessage()}");
// return ResponseFuture::createResolved(new TopicPublishResponseError(new UnknownError($e->getMessage())));
// }
//
// return ResponseFuture::createPending(
// function () use ($call): TopicPublishResponse {
// try {
// $this->processCall($call);
// } catch (SdkError $e) {
// return new TopicPublishResponseError($e);
// } catch (Exception $e) {
// return new TopicPublishResponseError(new UnknownError($e->getMessage()));
// }
//
// return new TopicPublishResponseSuccess();
// }
// );
// }


public function close(): void
Expand Down
2 changes: 1 addition & 1 deletion src/Topic/TopicClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ public function setLogger(LoggerInterface $logger): void
public function publish(string $cacheName, string $topicName, string $message): TopicPublishResponse
{
$this->logger->info("Publishing to topic: $topicName\n");
return $this->topicClient->publish($cacheName, $topicName, $message)-> wait();
return $this->topicClient->publish($cacheName, $topicName, $message);
}
}

0 comments on commit cd19529

Please sign in to comment.