diff --git a/examples/grpc/hello-world/src/index.ts b/examples/grpc/hello-world/src/index.ts index 489b01829..cf33c5838 100644 --- a/examples/grpc/hello-world/src/index.ts +++ b/examples/grpc/hello-world/src/index.ts @@ -1,18 +1,25 @@ -import Dapr, { HttpMethod } from "@roadwork/dapr-js-sdk/grpc"; +import { DaprServer, DaprClient, HttpMethod } from "@roadwork/dapr-js-sdk/grpc"; const daprHost = "127.0.0.1"; -const daprPort = "50050"; // gRPC Port for Dapr Client -const daprInternalServerPort = "50051"; // gRPC Port for Dapr Server +const daprPort = "50050"; // Dapr Sidecar Port of this Example Server +const daprPortActor = "10002"; // Dapr Sidecar Port of the Actor Server +const daprInternalServerPort = "50051"; // App Port of this Example Server const daprAppId = "example-hello-world"; +async function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} + async function start() { - const client = new Dapr(daprHost, daprPort, daprInternalServerPort); + const server = new DaprServer(daprHost, daprPort, daprInternalServerPort); + const client = new DaprClient(daprHost, daprPort); + const clientActor = new DaprClient(daprHost, daprPortActor); - // console.log("==============================================================="); - // console.log("REGISTERING SERVER HANDLERS") - // console.log("==============================================================="); - // await client.pubsub.subscribe("pubsub-redis", "test-topic", async (data: any) => console.log(`[Dapr-JS][Example][PubSub Subscribe CB] Data: ${data}`)); - // await client.binding.receive("binding-rabbitmq", async (data: any) => console.log(`[Dapr-JS][Example][Binding Receive CB] Data: ${data}`)); + console.log("==============================================================="); + console.log("REGISTERING SERVER HANDLERS") + console.log("==============================================================="); + await server.binding.receive("binding-rabbitmq", async (data) => console.log(`[Dapr-JS][Example][Binding Receive] Got Data: ${JSON.stringify(data)}`)); + await server.pubsub.subscribe("pubsub-redis", "test-topic", async (data) => console.log(`[Dapr-JS][Example][PubSub Subscribe] Got Data: ${JSON.stringify(data)}`)); console.log("==============================================================="); console.log("INITIALIZING") @@ -20,20 +27,19 @@ async function start() { // We initialize after registering our listeners since these should be defined upfront // this is how Dapr works, it waits until we are listening on the port. Once that is detected // it will scan the binding list and pubsub subscriptions list to process - await client.startServer(); - await client.startClient(); + await server.startServer(); console.log("==============================================================="); console.log("EXECUTING CLIENT -INVOKER") console.log("==============================================================="); - await client.invoker.listen("hello-world", async (data: any) => { + await server.invoker.listen("hello-world", async (data: any) => { console.log("[Dapr-JS][Example] POST /hello-world"); console.log(`[Dapr-JS][Example] Received: ${JSON.stringify(data.body)}`); console.log(`[Dapr-JS][Example] Replying to Client`); return { hello: "world received from POST" }; }, { method: HttpMethod.POST }); - await client.invoker.listen("hello-world", async () => { + await server.invoker.listen("hello-world", async () => { console.log("[Dapr-JS][Example] GET /hello-world"); console.log(`[Dapr-JS][Example] Replying to Client`); return { hello: "world received from GET" }; @@ -55,6 +61,8 @@ async function start() { const resPubSub = await client.pubsub.publish("pubsub-redis", "test-topic", { hello: "world" }); console.log(`[Dapr-JS][Example][PubSub RES] Data: ${JSON.stringify(resPubSub)}`); + await sleep(500); // wait a bit to receive the messages + console.log("==============================================================="); console.log("EXECUTING CLIENT - SECRETS"); console.log("==============================================================="); @@ -117,8 +125,6 @@ async function start() { console.log("EXECUTING CLIENT - ACTORS"); console.log("Note: we create new client for now since Actors are not supported internally!") console.log("==============================================================="); - const clientActor = new Dapr(daprHost, "10002"); - await clientActor.startClient(); await clientActor.actor.invoke("POST", "DemoActor", "MyActorId1", "SetDataAsync", { PropertyA: "hello", PropertyB: "world", ToNotExistKey: "this should not exist since we only have PropertyA and PropertyB" }); const resActorInvoke = await clientActor.actor.invoke("GET", "DemoActor", "MyActorId1", "GetDataAsync"); // will only return PropertyA and PropertyB since these are the only properties that can be set console.log(`[Dapr-JS][Example][Actors] Invoked Method and got data: ${JSON.stringify(resActorInvoke)}`); diff --git a/examples/http/hello-world/src/index.ts b/examples/http/hello-world/src/index.ts index b82bf4729..ccb4b863f 100644 --- a/examples/http/hello-world/src/index.ts +++ b/examples/http/hello-world/src/index.ts @@ -1,8 +1,9 @@ import Dapr, { HttpMethod } from "@roadwork/dapr-js-sdk/http"; const daprHost = "127.0.0.1"; -const daprPort = "50050"; // HTTP Port for Dapr Client -const daprInternalServerPort = "50051"; // HTTP Port for Dapr Server +const daprPort = "50050"; // Dapr Sidecar Port of this Example Server +const daprPortActor = "10002"; // Dapr Sidecar Port of the Actor Server +const daprInternalServerPort = "50051"; // App Port of this Example Server const daprAppId = "example-hello-world"; async function sleep(ms: number): Promise { @@ -10,13 +11,15 @@ async function sleep(ms: number): Promise { } async function start() { - const client = new Dapr(daprHost, daprPort, daprInternalServerPort); + const server = new DaprServer(daprHost, daprPort, daprInternalServerPort); + const client = new DaprClient(daprHost, daprPort); + const clientActor = new DaprClient(daprHost, daprPortActor); console.log("==============================================================="); console.log("REGISTERING SERVER HANDLERS") console.log("==============================================================="); - await client.binding.receive("binding-rabbitmq", async (data) => console.log(`[Dapr-JS][Example][Binding Receive] Got Data: ${JSON.stringify(data)}`)); - await client.pubsub.subscribe("pubsub-redis", "test-topic", async (data) => console.log(`[Dapr-JS][Example][PubSub Subscribe] Got Data: ${JSON.stringify(data)}`)); + await server.binding.receive("binding-rabbitmq", async (data) => console.log(`[Dapr-JS][Example][Binding Receive] Got Data: ${JSON.stringify(data)}`)); + await server.pubsub.subscribe("pubsub-redis", "test-topic", async (data) => console.log(`[Dapr-JS][Example][PubSub Subscribe] Got Data: ${JSON.stringify(data)}`)); console.log("==============================================================="); console.log("INITIALIZING") @@ -24,8 +27,7 @@ async function start() { // We initialize after registering our listeners since these should be defined upfront // this is how Dapr works, it waits until we are listening on the port. Once that is detected // it will scan the binding list and pubsub subscriptions list to process - await client.startServer(); - await client.startClient(); + await server.startServer(); console.log("==============================================================="); console.log("EXECUTING CLIENT -INVOKER") @@ -59,7 +61,7 @@ async function start() { await client.pubsub.publish("pubsub-redis", "test-topic", { hello: "world" }); console.log(`[Dapr-JS][Example][PubSub] Published to pubsub pubsub-redis on topic "test-topic"`); - await sleep(1000); // wait a bit to receive the messages + await sleep(500); // wait a bit to receive the messages console.log("==============================================================="); console.log("EXECUTING CLIENT - SECRETS"); @@ -123,12 +125,10 @@ async function start() { console.log("EXECUTING CLIENT - ACTORS"); console.log("Note: we create new client for now since Actors are not supported internally!") console.log("==============================================================="); - const clientActor = new Dapr(daprHost, "10002"); - await clientActor.startClient(); await clientActor.actor.invoke("POST", "DemoActor", "MyActorId1", "SetDataAsync", { PropertyA: "hello", PropertyB: "world", ToNotExistKey: "this should not exist since we only have PropertyA and PropertyB" }); const resActorInvoke = await clientActor.actor.invoke("GET", "DemoActor", "MyActorId1", "GetDataAsync"); // will only return PropertyA and PropertyB since these are the only properties that can be set console.log(`[Dapr-JS][Example][Actors] Invoked Method and got data: ${JSON.stringify(resActorInvoke)}`); - + await clientActor.actor.stateTransaction("DemoActor", "MyActorId1", [ { operation: "upsert", diff --git a/src/grpc/Dapr.ts b/src/grpc/Dapr.ts deleted file mode 100644 index 38f49a99f..000000000 --- a/src/grpc/Dapr.ts +++ /dev/null @@ -1,49 +0,0 @@ -import DaprBinding from './lib/binding'; -import DaprPubSub from './lib/pubsub'; -import DaprState from './lib/state'; -import GRPCClientSingleton from './lib/GRPCClient/GRPCClientSingleton'; -import GRPCServerSingleton from './lib/GRPCServer/GRPCServerSingleton'; -import DaprInvoker from './lib/invoker'; -import DaprSecret from './lib/secret'; -import DaprActor from './lib/actor'; - -export default class Dapr { - daprHost: string; - daprPort: string; - daprInternalServerPort: string; // The port for our app server (e.g. dapr binding receives, pubsub receive, ...) - pubsub: DaprPubSub; - state: DaprState; - binding: DaprBinding; - invoker: DaprInvoker; - secret: DaprSecret; - actor: DaprActor; - - constructor(daprHost: string, daprPort: string, daprInternalServerPort: string = "50050") { - this.daprHost = daprHost || '127.0.0.1'; - this.daprPort = daprPort || "5005"; - this.daprInternalServerPort = process.env.DAPR_INTERNAL_SERVER_PORT || daprInternalServerPort; - - // If DAPR_INTERNAL_SERVER_PORT was not set, we set it - // This will be fetched by the GRPCServerSingleton - process.env.DAPR_INTERNAL_SERVER_PORT = this.daprPort; - - // // Get the App Port as set in the Dapr constructor - // const randomPort = Math.floor(Math.random() * (20000 - 10000 + 1)) + 10000; - // const appPort = parseInt(process.env.DAPR_INTERNAL_SERVER_PORT || "", 10) || randomPort; - - this.state = new DaprState(); - this.pubsub = new DaprPubSub(); - this.binding = new DaprBinding(); - this.invoker = new DaprInvoker(); - this.secret = new DaprSecret(); - this.actor = new DaprActor(); - } - - async startClient() { - await GRPCClientSingleton.initialize(this.daprHost, this.daprPort.toString()); - } - - async startServer() { - await GRPCServerSingleton.startServer(this.daprHost, this.daprInternalServerPort.toString()); - } -} \ No newline at end of file diff --git a/src/grpc/DaprClient.ts b/src/grpc/DaprClient.ts new file mode 100644 index 000000000..79cdf38cd --- /dev/null +++ b/src/grpc/DaprClient.ts @@ -0,0 +1,33 @@ +import DaprClientBinding from './lib/GRPCClient/binding'; +import DaprClientPubSub from './lib/GRPCClient/pubsub'; +import DaprClientState from './lib/GRPCClient/state'; +import DaprClientInvoker from './lib/GRPCClient/invoker'; +import DaprClientSecret from './lib/GRPCClient/secret'; +import DaprClientActor from './lib/GRPCClient/actor'; +import GRPCClient from './lib/GRPCClient/GRPCClient'; + +export default class DaprClient { + daprHost: string; + daprPort: string; + daprClient: GRPCClient; + pubsub: DaprClientPubSub; + state: DaprClientState; + binding: DaprClientBinding; + invoker: DaprClientInvoker; + secret: DaprClientSecret; + actor: DaprClientActor; + + constructor(daprHost: string, daprPort: string) { + this.daprHost = daprHost || '127.0.0.1'; + this.daprPort = daprPort || "5005"; + + this.daprClient = new GRPCClient(daprHost, daprPort); + + this.state = new DaprClientState(this.daprClient); + this.pubsub = new DaprClientPubSub(this.daprClient); + this.binding = new DaprClientBinding(this.daprClient); + this.invoker = new DaprClientInvoker(this.daprClient); + this.secret = new DaprClientSecret(this.daprClient); + this.actor = new DaprClientActor(this.daprClient); + } +} \ No newline at end of file diff --git a/src/grpc/DaprServer.ts b/src/grpc/DaprServer.ts new file mode 100644 index 000000000..dc255d59c --- /dev/null +++ b/src/grpc/DaprServer.ts @@ -0,0 +1,40 @@ +import DaprServerBinding from './lib/GRPCServer/binding'; +import DaprServerPubSub from './lib/GRPCServer/pubsub'; +import DaprServerInvoker from './lib/GRPCServer/invoker'; +import DaprServerActor from './lib/GRPCServer/actor'; +import GRPCServer from './lib/GRPCServer/GRPCServer'; + +export default class DaprServer { + daprHost: string; + daprPort: string; + daprInternalServerPort: string; // The port for our app server (e.g. dapr binding receives, pubsub receive, ...)\ + daprServer: GRPCServer; + pubsub: DaprServerPubSub; + binding: DaprServerBinding; + invoker: DaprServerInvoker; + actor: DaprServerActor; + + constructor(daprHost: string, daprPort: string, daprInternalServerPort: string = "50050") { + this.daprHost = daprHost || '127.0.0.1'; + this.daprPort = daprPort || "5005"; + this.daprInternalServerPort = process.env.DAPR_INTERNAL_SERVER_PORT || daprInternalServerPort; + this.daprServer = new GRPCServer(this.daprHost, this.daprInternalServerPort); + + // If DAPR_INTERNAL_SERVER_PORT was not set, we set it + // This will be fetched by the GRPCServerSingleton + process.env.DAPR_INTERNAL_SERVER_PORT = this.daprPort; + + // // Get the App Port as set in the Dapr constructor + // const randomPort = Math.floor(Math.random() * (20000 - 10000 + 1)) + 10000; + // const appPort = parseInt(process.env.DAPR_INTERNAL_SERVER_PORT || "", 10) || randomPort; + + this.pubsub = new DaprServerPubSub(this.daprServer); + this.binding = new DaprServerBinding(this.daprServer); + this.invoker = new DaprServerInvoker(this.daprServer); + this.actor = new DaprServerActor(this.daprServer); + } + + async startServer() { + await this.daprServer.startServer(this.daprHost, this.daprInternalServerPort.toString()); + } +} \ No newline at end of file diff --git a/src/grpc/index.ts b/src/grpc/index.ts index 1ff3eda0b..bdb92b2ef 100644 --- a/src/grpc/index.ts +++ b/src/grpc/index.ts @@ -1,14 +1,15 @@ -import Dapr from './Dapr'; +import DaprClient from './DaprClient'; +import DaprServer from './DaprServer'; import { Request as Req, Response as Res } from 'restana'; import { HttpMethod } from './enum/HttpMethod.enum'; import HttpStatusCode from './enum/HttpStatusCode.enum'; -export default Dapr; - export { HttpMethod, HttpStatusCode, Req, - Res + Res, + DaprClient, + DaprServer } \ No newline at end of file diff --git a/src/grpc/lib/GRPCClient/GRPCClient.ts b/src/grpc/lib/GRPCClient/GRPCClient.ts index 1a89e8e03..5ceee14b1 100644 --- a/src/grpc/lib/GRPCClient/GRPCClient.ts +++ b/src/grpc/lib/GRPCClient/GRPCClient.ts @@ -18,4 +18,8 @@ export default class GRPCClient { console.log(`[Dapr-JS][gRPC] Opening connection to ${this.clientHost}:${this.clientPort}`); this.client = new DaprClient(`${this.clientHost}:${this.clientPort}`, this.clientCredentials); } + + getClient() { + return this.client; + } } \ No newline at end of file diff --git a/src/grpc/lib/GRPCClient/GRPCClientSingleton.ts b/src/grpc/lib/GRPCClient/GRPCClientSingleton.ts deleted file mode 100644 index a4daea929..000000000 --- a/src/grpc/lib/GRPCClient/GRPCClientSingleton.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { DaprClient } from "../../proto/dapr/proto/runtime/v1/dapr_grpc_pb"; -import GRPCClient from "./GRPCClient"; - -export default class GRPCClientSingleton { - private static instance?: GRPCClient; - - /** - * Private constructor to prevent direct construction calls - */ - private constructor() {} - - public static async initialize(host: string, port: string): Promise { - const instance = new GRPCClient(host, port); - this.instance = instance; - - console.log("[Dapr-JS] Created GRPC Client Singleton"); - } - - /** - * The static method that controls the access to the singleton instance. - * - * This implementation let you subclass the Singleton class while keeping - * just one instance of each subclass around. - */ - public static async getInstance(): Promise { - if (!this.instance) { - throw new Error(JSON.stringify({ - error: "GRPC_CLIENT_NOT_INITIALIZED", - error_message: "The gRPC client was not initialized, did you call `await GRPCClientSingleton.initialize()`?" - })) - } - - return this.instance; - } - - public static async getClient(): Promise { - const singleton = await this.getInstance(); - return singleton.client; - } - - public static async destroy(): Promise { - if (!this.instance) { - return; - } - - delete this.instance; - } -} \ No newline at end of file diff --git a/src/grpc/lib/actor.ts b/src/grpc/lib/GRPCClient/actor.ts similarity index 91% rename from src/grpc/lib/actor.ts rename to src/grpc/lib/GRPCClient/actor.ts index 105fb9b29..00b82bb43 100644 --- a/src/grpc/lib/actor.ts +++ b/src/grpc/lib/GRPCClient/actor.ts @@ -1,13 +1,19 @@ -import { OperationType } from '../types/Operation.type'; -import { ActorReminderType } from '../types/ActorReminder.type'; -import { ActorTimerType } from '../types/ActorTimer.type'; -import { ExecuteActorStateTransactionRequest, GetActorStateRequest, GetActorStateResponse, GetMetadataResponse, InvokeActorRequest, InvokeActorResponse, RegisterActorReminderRequest, RegisterActorTimerRequest, TransactionalActorStateOperation, UnregisterActorReminderRequest, UnregisterActorTimerRequest } from '../proto/dapr/proto/runtime/v1/dapr_pb'; -import GRPCClientSingleton from './GRPCClient/GRPCClientSingleton'; +import { OperationType } from '../../types/Operation.type'; +import { ActorReminderType } from '../../types/ActorReminder.type'; +import { ActorTimerType } from '../../types/ActorTimer.type'; +import { ExecuteActorStateTransactionRequest, GetActorStateRequest, GetActorStateResponse, GetMetadataResponse, InvokeActorRequest, InvokeActorResponse, RegisterActorReminderRequest, RegisterActorTimerRequest, TransactionalActorStateOperation, UnregisterActorReminderRequest, UnregisterActorTimerRequest } from '../../proto/dapr/proto/runtime/v1/dapr_pb'; import { Empty } from "google-protobuf/google/protobuf/empty_pb"; import { Any } from "google-protobuf/google/protobuf/any_pb"; +import GRPCClient from './GRPCClient'; // https://docs.dapr.io/reference/api/actors_api/ export default class DaprActor { + client: GRPCClient; + + constructor(client: GRPCClient) { + this.client = client; + } + async invoke(method: "GET" | "POST" | "PUT" | "DELETE", actorType: string, actorId: string, methodName: string, body?: object): Promise { const msgService = new InvokeActorRequest(); msgService.setActorId(actorId) @@ -19,7 +25,7 @@ export default class DaprActor { } return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.invokeActor(msgService, (err, res: InvokeActorResponse) => { if (err) { return reject(err); @@ -58,7 +64,7 @@ export default class DaprActor { msgService.setOperationsList(transactionItems); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.executeActorStateTransaction(msgService, (err, res) => { if (err) { return reject(err); @@ -77,7 +83,7 @@ export default class DaprActor { msgService.setKey(key); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.getActorState(msgService, (err, res: GetActorStateResponse) => { if (err) { return reject(err); @@ -114,7 +120,7 @@ export default class DaprActor { } return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.registerActorReminder(msgService, (err, res) => { if (err) { return reject(err); @@ -165,7 +171,7 @@ export default class DaprActor { msgService.setName(name); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.unregisterActorReminder(msgService, (err, res) => { if (err) { return reject(err); @@ -200,7 +206,7 @@ export default class DaprActor { } return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.registerActorTimer(msgService, (err, res) => { if (err) { return reject(err); @@ -219,7 +225,7 @@ export default class DaprActor { msgService.setName(name); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.unregisterActorTimer(msgService, (err, res) => { if (err) { return reject(err); @@ -253,7 +259,7 @@ export default class DaprActor { async getActors(): Promise { return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.getMetadata(new Empty(), (err, res: GetMetadataResponse) => { if (err) { diff --git a/src/grpc/lib/GRPCClient/binding.ts b/src/grpc/lib/GRPCClient/binding.ts new file mode 100644 index 000000000..461efdae8 --- /dev/null +++ b/src/grpc/lib/GRPCClient/binding.ts @@ -0,0 +1,37 @@ +import { InvokeBindingRequest, InvokeBindingResponse } from '../../proto/dapr/proto/runtime/v1/dapr_pb'; +import GRPCClient from './GRPCClient'; + +// https://docs.dapr.io/reference/api/bindings_api/ +export default class DaprBinding { + client: GRPCClient; + + constructor(client: GRPCClient) { + this.client = client; + } + + // Send an event to an external system + // @todo: should pass the metadata object + // @todo: should return a specific typed Promise instead of Promise + async send(bindingName: string, operation: string, data: any, metadata: object = {}): Promise { + const msgService = new InvokeBindingRequest(); + msgService.setName(bindingName); + msgService.setOperation(operation); + msgService.setData(Buffer.from(JSON.stringify(data), "utf-8")); + + return new Promise(async (resolve, reject) => { + const client = this.client.getClient(); + client.invokeBinding(msgService, (err, res: InvokeBindingResponse) => { + if (err) { + return reject(err); + } + + // https://docs.dapr.io/reference/api/bindings_api/#payload + return resolve({ + data: res.getData(), + metadata: res.getMetadataMap(), + operation + }); + }); + }); + } +} diff --git a/src/grpc/lib/invoker.ts b/src/grpc/lib/GRPCClient/invoker.ts similarity index 68% rename from src/grpc/lib/invoker.ts rename to src/grpc/lib/GRPCClient/invoker.ts index 3de56e2dd..c196e7904 100644 --- a/src/grpc/lib/invoker.ts +++ b/src/grpc/lib/GRPCClient/invoker.ts @@ -1,22 +1,17 @@ -import { TypeDaprInvokerCallback } from '../types/DaprInvokerCallback.type'; -import { InvokerListenOptionsType } from '../types/InvokerListenOptions.type'; -import { HttpMethod } from '../enum/HttpMethod.enum'; -import { HTTPExtension, InvokeRequest, InvokeResponse } from '../proto/dapr/proto/common/v1/common_pb'; -import { InvokeServiceRequest } from '../proto/dapr/proto/runtime/v1/dapr_pb'; +import { HttpMethod } from '../../enum/HttpMethod.enum'; +import { HTTPExtension, InvokeRequest, InvokeResponse } from '../../proto/dapr/proto/common/v1/common_pb'; +import { InvokeServiceRequest } from '../../proto/dapr/proto/runtime/v1/dapr_pb'; import { Any } from "google-protobuf/google/protobuf/any_pb"; -import GRPCServerSingleton from './GRPCServer/GRPCServerSingleton'; -import GRPCClientSingleton from './GRPCClient/GRPCClientSingleton'; - -import * as HttpVerbUtil from "../utils/HttpVerb.util"; +import * as HttpVerbUtil from "../../utils/HttpVerb.util"; +import GRPCClient from './GRPCClient'; // https://docs.dapr.io/reference/api/service_invocation_api/ export default class DaprInvoker { - async listen(methodName: string, cb: TypeDaprInvokerCallback, options: InvokerListenOptionsType = {}) { - const httpMethod: HttpMethod = options?.method?.toLowerCase() as HttpMethod || HttpMethod.GET; - const server = await GRPCServerSingleton.getServerImpl(); - console.log(`Registering onInvoke Handler ${httpMethod} /${methodName}`); - server.registerOnInvokeHandler(httpMethod, methodName, cb); + client: GRPCClient; + + constructor(client: GRPCClient) { + this.client = client; } // @todo: should return a specific typed Promise instead of Promise @@ -56,7 +51,7 @@ export default class DaprInvoker { msgInvokeService.setMessage(msgInvoke); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.invokeService(msgInvokeService, (err, res: InvokeResponse) => { if (err) { return reject(err); diff --git a/src/grpc/lib/pubsub.ts b/src/grpc/lib/GRPCClient/pubsub.ts similarity index 53% rename from src/grpc/lib/pubsub.ts rename to src/grpc/lib/GRPCClient/pubsub.ts index a0abfff0b..8e2414cff 100644 --- a/src/grpc/lib/pubsub.ts +++ b/src/grpc/lib/GRPCClient/pubsub.ts @@ -1,10 +1,14 @@ -import { PublishEventRequest } from "../proto/dapr/proto/runtime/v1/dapr_pb"; -import { TypeDaprPubSubCallback } from "../types/DaprPubSubCallback.type"; -import GRPCClientSingleton from "./GRPCClient/GRPCClientSingleton"; -import GRPCServerSingleton from "./GRPCServer/GRPCServerSingleton"; +import { PublishEventRequest } from "../../proto/dapr/proto/runtime/v1/dapr_pb"; +import GRPCClient from './GRPCClient'; // https://docs.dapr.io/reference/api/pubsub_api/ export default class DaprPubSub { + client: GRPCClient; + + constructor(client: GRPCClient) { + this.client = client; + } + // @todo: should return a specific typed Promise instead of Promise async publish(pubSubName: string, topic: string, data: object = {}): Promise { const msgService = new PublishEventRequest(); @@ -13,7 +17,7 @@ export default class DaprPubSub { msgService.setData(Buffer.from(JSON.stringify(data), "utf-8")); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.publishEvent(msgService, (err, res) => { if (err) { return reject(err); @@ -24,10 +28,4 @@ export default class DaprPubSub { }); }); } - - async subscribe(pubSubName: string, topic: string, cb: TypeDaprPubSubCallback) { - const server = await GRPCServerSingleton.getServerImpl(); - console.log(`Registering onTopicEvent Handler: PubSub = ${pubSubName}; Topic = ${topic}`); - server.registerPubSubSubscriptionHandler(pubSubName, topic, cb); - } } diff --git a/src/grpc/lib/secret.ts b/src/grpc/lib/GRPCClient/secret.ts similarity index 82% rename from src/grpc/lib/secret.ts rename to src/grpc/lib/GRPCClient/secret.ts index 27fed13e9..ebe8ad71d 100644 --- a/src/grpc/lib/secret.ts +++ b/src/grpc/lib/GRPCClient/secret.ts @@ -1,8 +1,14 @@ -import { GetBulkSecretRequest, GetBulkSecretResponse, GetSecretRequest, GetSecretResponse } from "../proto/dapr/proto/runtime/v1/dapr_pb"; -import GRPCClientSingleton from "./GRPCClient/GRPCClientSingleton"; +import { GetBulkSecretRequest, GetBulkSecretResponse, GetSecretRequest, GetSecretResponse } from "../../proto/dapr/proto/runtime/v1/dapr_pb"; +import GRPCClient from './GRPCClient'; // https://docs.dapr.io/reference/api/secrets_api/ export default class DaprSecret { + client: GRPCClient; + + constructor(client: GRPCClient) { + this.client = client; + } + // @todo: implement metadata async get(secretStoreName: string, key: string, metadata: string = ""): Promise { const msgService = new GetSecretRequest(); @@ -10,7 +16,7 @@ export default class DaprSecret { msgService.setKey(key); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.getSecret(msgService, (err, res: GetSecretResponse) => { if (err) { return reject(err); @@ -29,7 +35,7 @@ export default class DaprSecret { msgService.setStoreName(secretStoreName); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.getBulkSecret(msgService, (err, res: GetBulkSecretResponse) => { if (err) { return reject(err); diff --git a/src/grpc/lib/state.ts b/src/grpc/lib/GRPCClient/state.ts similarity index 87% rename from src/grpc/lib/state.ts rename to src/grpc/lib/GRPCClient/state.ts index 6660857a1..e5a603eb8 100644 --- a/src/grpc/lib/state.ts +++ b/src/grpc/lib/GRPCClient/state.ts @@ -1,12 +1,18 @@ -import { IKeyValuePair } from '../types/KeyValuePair.type'; -import { OperationType } from '../types/Operation.type'; -import { IRequestMetadata } from '../types/RequestMetadata.type'; -import { DeleteStateRequest, ExecuteStateTransactionRequest, GetBulkStateRequest, GetBulkStateResponse, GetStateRequest, GetStateResponse, SaveStateRequest, TransactionalStateOperation } from '../proto/dapr/proto/runtime/v1/dapr_pb'; -import GRPCClientSingleton from './GRPCClient/GRPCClientSingleton'; -import { Etag, StateItem, StateOptions } from '../proto/dapr/proto/common/v1/common_pb'; +import { IKeyValuePair } from '../../types/KeyValuePair.type'; +import { OperationType } from '../../types/Operation.type'; +import { IRequestMetadata } from '../../types/RequestMetadata.type'; +import { DeleteStateRequest, ExecuteStateTransactionRequest, GetBulkStateRequest, GetBulkStateResponse, GetStateRequest, GetStateResponse, SaveStateRequest, TransactionalStateOperation } from '../../proto/dapr/proto/runtime/v1/dapr_pb'; +import { Etag, StateItem, StateOptions } from '../../proto/dapr/proto/common/v1/common_pb'; +import GRPCClient from './GRPCClient'; // https://docs.dapr.io/reference/api/state_api/ export default class DaprState { + client: GRPCClient; + + constructor(client: GRPCClient) { + this.client = client; + } + async save(storeName: string, stateObjects: IKeyValuePair[]): Promise { const stateList = []; @@ -22,7 +28,7 @@ export default class DaprState { msgService.setStatesList(stateList); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.saveState(msgService, (err, res) => { if (err) { return reject(err); @@ -43,7 +49,7 @@ export default class DaprState { // msgService.setConsistency() return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.getState(msgService, (err, res: GetStateResponse) => { if (err) { return reject(err); @@ -68,7 +74,7 @@ export default class DaprState { // msgService.setConsistency() return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.getBulkState(msgService, (err, res: GetBulkStateResponse) => { if (err) { return reject(err); @@ -96,7 +102,7 @@ export default class DaprState { // msgService.setOptions(); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.deleteState(msgService, (err, res) => { if (err) { return reject(err); @@ -112,7 +118,7 @@ export default class DaprState { const transactionItems: TransactionalStateOperation[] = []; for (const o of operations) { - const si = new StateItem(); + const si = new StateItem(); si.setKey(o.request.key); si.setValue(Buffer.from(o.request.value || "", "utf-8")); @@ -143,7 +149,7 @@ export default class DaprState { msgService.setOperationsList(transactionItems); return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); + const client = this.client.getClient(); client.executeStateTransaction(msgService, (err, res) => { if (err) { return reject(err); diff --git a/src/grpc/lib/GRPCServer/GRPCServer.ts b/src/grpc/lib/GRPCServer/GRPCServer.ts index 8566b82ae..13e3efbfd 100644 --- a/src/grpc/lib/GRPCServer/GRPCServer.ts +++ b/src/grpc/lib/GRPCServer/GRPCServer.ts @@ -21,11 +21,11 @@ export default class GRPCServer { serverImpl: IServerImplType; serverCredentials: grpc.ServerCredentials; - constructor() { + constructor(host: string, port: string) { this.isInitialized = false; - this.serverHost = ""; - this.serverPort = ""; + this.serverHost = host; + this.serverPort = port; // Create Server this.server = new grpc.Server(); diff --git a/src/grpc/lib/GRPCServer/GRPCServerImpl.ts b/src/grpc/lib/GRPCServer/GRPCServerImpl.ts index 2530fe73f..3abc820b4 100644 --- a/src/grpc/lib/GRPCServer/GRPCServerImpl.ts +++ b/src/grpc/lib/GRPCServer/GRPCServerImpl.ts @@ -7,7 +7,6 @@ import { BindingEventRequest, BindingEventResponse, ListInputBindingsResponse, L import { TypeDaprBindingCallback } from "../../types/DaprBindingCallback.type"; import { TypeDaprPubSubCallback } from "../../types/DaprPubSubCallback.type"; - import { Empty } from "google-protobuf/google/protobuf/empty_pb"; import { Any } from "google-protobuf/google/protobuf/any_pb"; diff --git a/src/grpc/lib/GRPCServer/GRPCServerSingleton.ts b/src/grpc/lib/GRPCServer/GRPCServerSingleton.ts deleted file mode 100644 index 89361c7ff..000000000 --- a/src/grpc/lib/GRPCServer/GRPCServerSingleton.ts +++ /dev/null @@ -1,60 +0,0 @@ -import GRPCServer, { IServerImplType, IServerType } from "./GRPCServer"; - -export default class GRPCServerSingleton { - private static instance?: GRPCServer; - - /** - * Private constructor to prevent direct construction calls - */ - private constructor() {} - - public static async startServer(host: string, port: string): Promise { - const instance = this.getInstance(); - await instance.startServer(host, port); - - this.instance = instance; - - console.log("[Dapr-JS][gRPC] Created GRPC Server Singleton"); - } - - /** - * The static method that controls the access to the singleton instance. - * - * This implementation let you subclass the Singleton class while keeping - * just one instance of each subclass around. - */ - public static getInstance(): GRPCServer { - if (!this.instance) { - this.instance = new GRPCServer(); - } - - return this.instance; - } - - public static getServer(): IServerType { - const server = this.getInstance(); - return server.server; - } - - public static getServerImpl(): IServerImplType { - const server = this.getInstance(); - return server.serverImpl; - } - - public static getServerAddress(): string { - const server = this.getInstance(); - return server.getServerAddress(); - } - - public static async destroy(): Promise { - if (!this.instance) { - return; - } - - const server = this.getInstance(); - await server.close(); - console.log("[Dapr-JS] Destroyed GRPCServerSingleton"); - - delete this.instance; - } -} \ No newline at end of file diff --git a/src/grpc/lib/GRPCServer/actor.ts b/src/grpc/lib/GRPCServer/actor.ts new file mode 100644 index 000000000..6b11f3795 --- /dev/null +++ b/src/grpc/lib/GRPCServer/actor.ts @@ -0,0 +1,10 @@ +import GRPCServer from "./GRPCServer"; + +// https://docs.dapr.io/reference/api/actors_api/ +export default class DaprActor { + server: GRPCServer; + + constructor(server: GRPCServer) { + this.server = server; + } +} diff --git a/src/grpc/lib/GRPCServer/binding.ts b/src/grpc/lib/GRPCServer/binding.ts new file mode 100644 index 000000000..66f731ffe --- /dev/null +++ b/src/grpc/lib/GRPCServer/binding.ts @@ -0,0 +1,17 @@ +import { TypeDaprBindingCallback } from '../../types/DaprBindingCallback.type'; +import GRPCServer from './GRPCServer'; + +// https://docs.dapr.io/reference/api/bindings_api/ +export default class DaprBinding { + server: GRPCServer; + + constructor(server: GRPCServer) { + this.server = server; + } + + // Receive an input from an external system + async receive(bindingName: string, cb: TypeDaprBindingCallback) { + console.log(`Registering onBindingInput Handler: Binding = ${bindingName}`); + this.server.getServerImpl().registerInputBindingHandler(bindingName, cb); + } +} diff --git a/src/grpc/lib/GRPCServer/index.ts b/src/grpc/lib/GRPCServer/index.ts index 9171eac40..27de0b55c 100644 --- a/src/grpc/lib/GRPCServer/index.ts +++ b/src/grpc/lib/GRPCServer/index.ts @@ -1,10 +1,10 @@ -import WebServer, { IRequest, IResponse, IServerType } from "./GRPCServer"; -import WebServerSingleton from "./GRPCServerSingleton"; +import GRPCServer, { IRequest, IResponse, IServerType } from "./GRPCServer"; +import GRPCServerImpl from "./GRPCServerImpl"; export { IRequest, IResponse, IServerType, - WebServer, - WebServerSingleton + GRPCServer, + GRPCServerImpl } \ No newline at end of file diff --git a/src/grpc/lib/GRPCServer/invoker.ts b/src/grpc/lib/GRPCServer/invoker.ts new file mode 100644 index 000000000..1358282a6 --- /dev/null +++ b/src/grpc/lib/GRPCServer/invoker.ts @@ -0,0 +1,19 @@ +import { TypeDaprInvokerCallback } from '../../types/DaprInvokerCallback.type'; +import { InvokerListenOptionsType } from '../../types/InvokerListenOptions.type'; +import { HttpMethod } from '../../enum/HttpMethod.enum'; +import GRPCServer from './GRPCServer'; + +// https://docs.dapr.io/reference/api/service_invocation_api/ +export default class DaprInvoker { + server: GRPCServer; + + constructor(server: GRPCServer) { + this.server = server; + } + + async listen(methodName: string, cb: TypeDaprInvokerCallback, options: InvokerListenOptionsType = {}) { + const httpMethod: HttpMethod = options?.method?.toLowerCase() as HttpMethod || HttpMethod.GET; + console.log(`Registering onInvoke Handler ${httpMethod} /${methodName}`); + this.server.getServerImpl().registerOnInvokeHandler(httpMethod, methodName, cb); + } +} diff --git a/src/grpc/lib/GRPCServer/pubsub.ts b/src/grpc/lib/GRPCServer/pubsub.ts new file mode 100644 index 000000000..4226be331 --- /dev/null +++ b/src/grpc/lib/GRPCServer/pubsub.ts @@ -0,0 +1,16 @@ +import { TypeDaprPubSubCallback } from "../../types/DaprPubSubCallback.type"; +import GRPCServer from "./GRPCServer"; + +// https://docs.dapr.io/reference/api/pubsub_api/ +export default class DaprPubSub { + server: GRPCServer; + + constructor(server: GRPCServer) { + this.server = server; + } + + async subscribe(pubSubName: string, topic: string, cb: TypeDaprPubSubCallback) { + console.log(`Registering onTopicEvent Handler: PubSub = ${pubSubName}; Topic = ${topic}`); + this.server.getServerImpl().registerPubSubSubscriptionHandler(pubSubName, topic, cb); + } +} diff --git a/src/grpc/lib/binding.ts b/src/grpc/lib/binding.ts deleted file mode 100644 index 35b346871..000000000 --- a/src/grpc/lib/binding.ts +++ /dev/null @@ -1,82 +0,0 @@ -import { InvokeBindingRequest, InvokeBindingResponse } from '../proto/dapr/proto/runtime/v1/dapr_pb'; -import { TypeDaprBindingCallback } from '../types/DaprBindingCallback.type'; -import GRPCClientSingleton from './GRPCClient/GRPCClientSingleton'; -import GRPCServerSingleton from './GRPCServer/GRPCServerSingleton'; - -// https://docs.dapr.io/reference/api/bindings_api/ -export default class DaprBinding { - // Receive an input from an external system - async receive(bindingName: string, cb: TypeDaprBindingCallback) { - const server = await GRPCServerSingleton.getServerImpl(); - console.log(`Registering onBindingInput Handler: Binding = ${bindingName}`); - server.registerInputBindingHandler(bindingName, cb); - } - - // Send an event to an external system - // @todo: should pass the metadata object - // @todo: should return a specific typed Promise instead of Promise - async send(bindingName: string, operation: string, data: any, metadata: object = {}): Promise { - const msgService = new InvokeBindingRequest(); - msgService.setName(bindingName); - msgService.setOperation(operation); - msgService.setData(Buffer.from(JSON.stringify(data), "utf-8")); - - return new Promise(async (resolve, reject) => { - const client = await GRPCClientSingleton.getClient(); - client.invokeBinding(msgService, (err, res: InvokeBindingResponse) => { - if (err) { - return reject(err); - } - - // https://docs.dapr.io/reference/api/bindings_api/#payload - return resolve({ - data: res.getData(), - metadata: res.getMetadataMap(), - operation - }); - }); - }) - - - - // // InvokeServiceRequest represents the request message for Service invocation. - // const msgInvokeService = new InvokeServiceRequest(); - // msgInvokeService.setId(appId); - - // const httpExtension = new HTTPExtension(); - // httpExtension.setVerb(HttpVerbUtil.convertHttpVerbStringToNumber(method)); - - // const msgSerialized = new Any(); - // msgSerialized.setValue(Buffer.from(JSON.stringify(data), "utf-8")); - - // const msgInvoke = new InvokeRequest(); - // msgInvoke.setMethod(methodName); - // msgInvoke.setHttpExtension(httpExtension); - // msgInvoke.setData(msgSerialized); - // msgInvoke.setContentType("application/json"); - - // msgInvokeService.setMessage(msgInvoke); - - // return new Promise(async (resolve, reject) => { - // const client = await GRPCClientSingleton.getClient(); - // client.invokeService(msgInvokeService, (err, res: InvokeResponse) => { - // if (err) { - // return reject(err); - // } - - // // const res = await fetch(`${this.daprUrl}/invoke/${appId}/method/${methodName}`, fetchOptions); - // // return ResponseUtil.handleResponse(res); - // const resContentType = res.getContentType(); - // const resData = Buffer.from((res.getData() as Any).getValue()).toString(); - - // return resolve({ - // body: resData, - // query: httpExtension.getQuerystring(), - // metadata: { - // contentType: resContentType - // } - // }); - // }) - // }) - } -}