From 9814cfb81b88e6af71e6f60078bbd2bda0efb1e5 Mon Sep 17 00:00:00 2001 From: chris-4chain <152964795+chris-4chain@users.noreply.github.com> Date: Fri, 12 Jul 2024 10:36:57 +0200 Subject: [PATCH] feat(SPV-848) notifications (#630) * feat(SPV-848): notifiactions & webhook notifier * feat(SVP-848): notifications manager * feat(SPV-848): notifications unit tests * feat(SPV-848): package notification moved to engine * feat(SPV-848): webhook_notifier tests * feat(SPV-848): webhook manager tests * feat(SPV-848): fix some tests * feat(SPV-848): call with token & test for ban fcn * feat(SPV-848): webhook subscribtion * feat(SPV-848): fix deadlock * feat(SPV-848): update webhook info * feat(SPV-848): blocking select instead of with-default Sleep * feat(SPV-848): unsubscribe * feat(SPV-848): event type * feat(SPV-848): event names by reflect::Name * feat(SPV-848): tidy things * fix(SPV-848): fix linter errors * feat(SPV-848): fix lint errors and a unit test; and a swagger comments * feat(SPV-848): regenerate swagger * feat(SPV-848): fix nil ptr exception * feat(SPV-848): fix minor unit tests * feat(SPV-848): adjust to self-review * feat(SPV-848): events defined in models package * feat(SPV-848): wrong log placement * feat(SPV-848): adjust to new ExtendedError approach * feat(SPV-848): adjust to review * feat(SVP-848): typo * feat(SPV-848): update swagger * feat(SPV-848): reimplement webhook model-repository logic * feat(SPV-848): webhooks errors and minor changes * feat(SPV-848): lint errors * feat(SPV-848): update swagger * feat(SPV-848): one empty file removed * feat(SPV-848): adjust to minor comments * feat(SPV-848): BanUntil * feat(SPV-848): remove mocked notifications --- actions/admin/routes.go | 2 + actions/admin/webhooks.go | 61 ++++ config.example.yaml | 2 - config/config.go | 2 - config/defaults.go | 3 +- config/services.go | 2 +- docs/docs.go | 106 +++++- docs/swagger.json | 106 +++++- docs/swagger.yaml | 65 +++- engine/client.go | 26 +- engine/client_internal.go | 32 +- engine/client_options.go | 27 +- engine/client_options_test.go | 6 +- engine/client_test.go | 1 - engine/db_model_transactions.go | 22 +- engine/definitions.go | 7 + engine/interface.go | 7 +- engine/model_destinations.go | 7 - engine/model_webhook.go | 195 +++++++++++ engine/models_internal.go | 20 -- engine/models_test.go | 3 +- engine/notifications/client.go | 80 ----- engine/notifications/client_options.go | 54 --- engine/notifications/client_options_test.go | 62 ---- engine/notifications/client_test.go | 29 -- engine/notifications/event_utils.go | 56 +++ engine/notifications/event_utils_test.go | 32 ++ engine/notifications/interface.go | 30 +- engine/notifications/mock_sender.go | 26 ++ engine/notifications/notifications.go | 107 +++--- engine/notifications/notifications_test.go | 247 +++++++++---- engine/notifications/webhook_manager.go | 203 +++++++++++ engine/notifications/webhook_manager_test.go | 121 +++++++ engine/notifications/webhook_notifier.go | 151 ++++++++ engine/notifications/webhook_notifier_test.go | 330 ++++++++++++++++++ engine/spverrors/definitions.go | 14 + engine/sync_tx_service.go | 4 - models/notifications.go | 46 +++ 38 files changed, 1819 insertions(+), 475 deletions(-) create mode 100644 actions/admin/webhooks.go create mode 100644 engine/model_webhook.go delete mode 100644 engine/notifications/client.go delete mode 100644 engine/notifications/client_options.go delete mode 100644 engine/notifications/client_options_test.go delete mode 100644 engine/notifications/client_test.go create mode 100644 engine/notifications/event_utils.go create mode 100644 engine/notifications/event_utils_test.go create mode 100644 engine/notifications/mock_sender.go create mode 100644 engine/notifications/webhook_manager.go create mode 100644 engine/notifications/webhook_manager_test.go create mode 100644 engine/notifications/webhook_notifier.go create mode 100644 engine/notifications/webhook_notifier_test.go create mode 100644 models/notifications.go diff --git a/actions/admin/routes.go b/actions/admin/routes.go index cbf3d231..aedc8e24 100644 --- a/actions/admin/routes.go +++ b/actions/admin/routes.go @@ -42,6 +42,8 @@ func NewHandler(appConfig *config.AppConfig, services *config.AppServices) route adminGroup.POST("/xpub", action.xpubsCreate) adminGroup.POST("/xpubs/search", action.xpubsSearch) adminGroup.POST("/xpubs/count", action.xpubsCount) + adminGroup.POST("/webhooks/subscriptions", action.subscribeWebhook) + adminGroup.DELETE("/webhooks/subscriptions", action.unsubscribeWebhook) }) return adminEndpoints diff --git a/actions/admin/webhooks.go b/actions/admin/webhooks.go new file mode 100644 index 00000000..f79ba974 --- /dev/null +++ b/actions/admin/webhooks.go @@ -0,0 +1,61 @@ +package admin + +import ( + "net/http" + + "github.com/bitcoin-sv/spv-wallet/engine/spverrors" + "github.com/bitcoin-sv/spv-wallet/models" + "github.com/gin-gonic/gin" +) + +// subscribeWebhook will subscribe to a webhook to receive notifications +// @Summary Subscribe to a webhook +// @Description Subscribe to a webhook to receive notifications +// @Tags Admin +// @Produce json +// @Param SubscribeRequestBody body models.SubscribeRequestBody false "URL to subscribe to and optional token header and value" +// @Success 200 {boolean} bool "Success response" +// @Failure 500 "Internal server error - Error while subscribing to the webhook" +// @Router /v1/admin/webhooks/subscriptions [post] +// @Security x-auth-xpub +func (a *Action) subscribeWebhook(c *gin.Context) { + requestBody := models.SubscribeRequestBody{} + if err := c.Bind(&requestBody); err != nil { + c.JSON(http.StatusBadRequest, err.Error()) + return + } + + err := a.Services.SpvWalletEngine.SubscribeWebhook(c.Request.Context(), requestBody.URL, requestBody.TokenHeader, requestBody.TokenValue) + if err != nil { + spverrors.ErrorResponse(c, err, a.Services.Logger) + return + } + + c.JSON(http.StatusOK, true) +} + +// unsubscribeWebhook will unsubscribe to a webhook to receive notifications +// @Summary Unsubscribe to a webhook +// @Description Unsubscribe to a webhook to stop receiving notifications +// @Tags Admin +// @Produce json +// @Param UnsubscribeRequestBody body models.UnsubscribeRequestBody false "URL to unsubscribe from" +// @Success 200 {boolean} bool "Success response" +// @Failure 500 "Internal server error - Error while unsubscribing to the webhook" +// @Router /v1/admin/webhooks/subscriptions [delete] +// @Security x-auth-xpub +func (a *Action) unsubscribeWebhook(c *gin.Context) { + requestModel := models.UnsubscribeRequestBody{} + if err := c.Bind(&requestModel); err != nil { + c.JSON(http.StatusBadRequest, err.Error()) + return + } + + err := a.Services.SpvWalletEngine.UnsubscribeWebhook(c.Request.Context(), requestModel.URL) + if err != nil { + spverrors.ErrorResponse(c, err, a.Services.Logger) + return + } + + c.JSON(http.StatusOK, true) +} diff --git a/config.example.yaml b/config.example.yaml index d02d566d..0e1b14ae 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -94,8 +94,6 @@ nodes: bytes: 1000 notifications: enabled: false - # url to send notifications - webhook_endpoint: "" paymail: beef: block_headers_service_auth_token: mQZQ6WmxURxWz5ch diff --git a/config/config.go b/config/config.go index 4d57302d..d4907c97 100644 --- a/config/config.go +++ b/config/config.go @@ -174,8 +174,6 @@ type ArcAPI struct { // NotificationsConfig is the configuration for notifications type NotificationsConfig struct { - // WebhookEndpoint is the endpoint for webhook registration. - WebhookEndpoint string `json:"webhook_endpoint" mapstructure:"webhook_endpoint"` // Enabled is the flag that enables notifications service. Enabled bool `json:"enabled" mapstructure:"enabled"` } diff --git a/config/defaults.go b/config/defaults.go index 36d0ed04..96ece8be 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -131,8 +131,7 @@ func getNodesDefaults() *NodesConfig { func getNotificationDefaults() *NotificationsConfig { return &NotificationsConfig{ - Enabled: false, - WebhookEndpoint: "", + Enabled: true, } } diff --git a/config/services.go b/config/services.go index 22a0ebe0..a637b270 100644 --- a/config/services.go +++ b/config/services.go @@ -179,7 +179,7 @@ func (s *AppServices) loadSPVWallet(ctx context.Context, appConfig *AppConfig, t options = loadTaskManager(appConfig, options) if appConfig.Notifications != nil && appConfig.Notifications.Enabled { - options = append(options, engine.WithNotifications(appConfig.Notifications.WebhookEndpoint)) + options = append(options, engine.WithNotifications()) } options = loadBroadcastClientArc(appConfig, options, logger) diff --git a/docs/docs.go b/docs/docs.go index f321ce12..cdce133f 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1079,6 +1079,80 @@ const docTemplate = `{ } } }, + "/v1/admin/webhooks/subscriptions": { + "post": { + "security": [ + { + "x-auth-xpub": [] + } + ], + "description": "Subscribe to a webhook to receive notifications", + "produces": [ + "application/json" + ], + "tags": [ + "Admin" + ], + "summary": "Subscribe to a webhook", + "parameters": [ + { + "description": "URL to subscribe to and optional token header and value", + "name": "SubscribeRequestBody", + "in": "body", + "schema": { + "$ref": "#/definitions/models.SubscribeRequestBody" + } + } + ], + "responses": { + "200": { + "description": "Success response", + "schema": { + "type": "boolean" + } + }, + "500": { + "description": "Internal server error - Error while subscribing to the webhook" + } + } + }, + "delete": { + "security": [ + { + "x-auth-xpub": [] + } + ], + "description": "Unsubscribe to a webhook to stop receiving notifications", + "produces": [ + "application/json" + ], + "tags": [ + "Admin" + ], + "summary": "Unsubscribe to a webhook", + "parameters": [ + { + "description": "URL to unsubscribe from", + "name": "UnsubscribeRequestBody", + "in": "body", + "schema": { + "$ref": "#/definitions/models.UnsubscribeRequestBody" + } + } + ], + "responses": { + "200": { + "description": "Success response", + "schema": { + "type": "boolean" + } + }, + "500": { + "description": "Internal server error - Error while unsubscribing to the webhook" + } + } + } + }, "/v1/admin/xpub": { "post": { "security": [ @@ -4005,6 +4079,20 @@ const docTemplate = `{ } } }, + "models.SubscribeRequestBody": { + "type": "object", + "properties": { + "tokenHeader": { + "type": "string" + }, + "tokenValue": { + "type": "string" + }, + "url": { + "type": "string" + } + } + }, "models.SyncConfig": { "type": "object", "properties": { @@ -4390,6 +4478,14 @@ const docTemplate = `{ } } }, + "models.UnsubscribeRequestBody": { + "type": "object", + "properties": { + "url": { + "type": "string" + } + } + }, "models.Utxo": { "type": "object", "properties": { @@ -4556,14 +4652,11 @@ const docTemplate = `{ 1000000000, 60000000000, 3600000000000, - -9223372036854775808, - 9223372036854775807, 1, 1000, 1000000, 1000000000, - 60000000000, - 3600000000000 + 60000000000 ], "x-enum-varnames": [ "minDuration", @@ -4574,14 +4667,11 @@ const docTemplate = `{ "Second", "Minute", "Hour", - "minDuration", - "maxDuration", "Nanosecond", "Microsecond", "Millisecond", "Second", - "Minute", - "Hour" + "Minute" ] }, "transactions.NewTransaction": { diff --git a/docs/swagger.json b/docs/swagger.json index e693565d..5c1628e6 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1070,6 +1070,80 @@ } } }, + "/v1/admin/webhooks/subscriptions": { + "post": { + "security": [ + { + "x-auth-xpub": [] + } + ], + "description": "Subscribe to a webhook to receive notifications", + "produces": [ + "application/json" + ], + "tags": [ + "Admin" + ], + "summary": "Subscribe to a webhook", + "parameters": [ + { + "description": "URL to subscribe to and optional token header and value", + "name": "SubscribeRequestBody", + "in": "body", + "schema": { + "$ref": "#/definitions/models.SubscribeRequestBody" + } + } + ], + "responses": { + "200": { + "description": "Success response", + "schema": { + "type": "boolean" + } + }, + "500": { + "description": "Internal server error - Error while subscribing to the webhook" + } + } + }, + "delete": { + "security": [ + { + "x-auth-xpub": [] + } + ], + "description": "Unsubscribe to a webhook to stop receiving notifications", + "produces": [ + "application/json" + ], + "tags": [ + "Admin" + ], + "summary": "Unsubscribe to a webhook", + "parameters": [ + { + "description": "URL to unsubscribe from", + "name": "UnsubscribeRequestBody", + "in": "body", + "schema": { + "$ref": "#/definitions/models.UnsubscribeRequestBody" + } + } + ], + "responses": { + "200": { + "description": "Success response", + "schema": { + "type": "boolean" + } + }, + "500": { + "description": "Internal server error - Error while unsubscribing to the webhook" + } + } + } + }, "/v1/admin/xpub": { "post": { "security": [ @@ -3996,6 +4070,20 @@ } } }, + "models.SubscribeRequestBody": { + "type": "object", + "properties": { + "tokenHeader": { + "type": "string" + }, + "tokenValue": { + "type": "string" + }, + "url": { + "type": "string" + } + } + }, "models.SyncConfig": { "type": "object", "properties": { @@ -4381,6 +4469,14 @@ } } }, + "models.UnsubscribeRequestBody": { + "type": "object", + "properties": { + "url": { + "type": "string" + } + } + }, "models.Utxo": { "type": "object", "properties": { @@ -4547,14 +4643,11 @@ 1000000000, 60000000000, 3600000000000, - -9223372036854775808, - 9223372036854775807, 1, 1000, 1000000, 1000000000, - 60000000000, - 3600000000000 + 60000000000 ], "x-enum-varnames": [ "minDuration", @@ -4565,14 +4658,11 @@ "Second", "Minute", "Hour", - "minDuration", - "maxDuration", "Nanosecond", "Microsecond", "Millisecond", "Second", - "Minute", - "Hour" + "Minute" ] }, "transactions.NewTransaction": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 15ef6548..2caabe22 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -1373,6 +1373,15 @@ definitions: type: string type: array type: object + models.SubscribeRequestBody: + properties: + tokenHeader: + type: string + tokenValue: + type: string + url: + type: string + type: object models.SyncConfig: properties: broadcast: @@ -1656,6 +1665,11 @@ definitions: example: false type: boolean type: object + models.UnsubscribeRequestBody: + properties: + url: + type: string + type: object models.Utxo: properties: created_at: @@ -1787,14 +1801,11 @@ definitions: - 1000000000 - 60000000000 - 3600000000000 - - -9223372036854775808 - - 9223372036854775807 - 1 - 1000 - 1000000 - 1000000000 - 60000000000 - - 3600000000000 type: integer x-enum-varnames: - minDuration @@ -1805,14 +1816,11 @@ definitions: - Second - Minute - Hour - - minDuration - - maxDuration - Nanosecond - Microsecond - Millisecond - Second - Minute - - Hour transactions.NewTransaction: properties: config: @@ -2562,6 +2570,51 @@ paths: summary: Search for utxos tags: - Admin + /v1/admin/webhooks/subscriptions: + delete: + description: Unsubscribe to a webhook to stop receiving notifications + parameters: + - description: URL to unsubscribe from + in: body + name: UnsubscribeRequestBody + schema: + $ref: '#/definitions/models.UnsubscribeRequestBody' + produces: + - application/json + responses: + "200": + description: Success response + schema: + type: boolean + "500": + description: Internal server error - Error while unsubscribing to the webhook + security: + - x-auth-xpub: [] + summary: Unsubscribe to a webhook + tags: + - Admin + post: + description: Subscribe to a webhook to receive notifications + parameters: + - description: URL to subscribe to and optional token header and value + in: body + name: SubscribeRequestBody + schema: + $ref: '#/definitions/models.SubscribeRequestBody' + produces: + - application/json + responses: + "200": + description: Success response + schema: + type: boolean + "500": + description: Internal server error - Error while subscribing to the webhook + security: + - x-auth-xpub: [] + summary: Subscribe to a webhook + tags: + - Admin /v1/admin/xpub: post: description: Create xPub diff --git a/engine/client.go b/engine/client.go index c0bfa83e..059eb1a4 100644 --- a/engine/client.go +++ b/engine/client.go @@ -92,9 +92,9 @@ type ( // notificationsOptions holds the configuration for notifications notificationsOptions struct { - notifications.ClientInterface // Notifications client - options []notifications.ClientOps // List of options - webhookEndpoint string // Webhook endpoint + enabled bool + client *notifications.Notifications + webhookManager *notifications.WebhookManager } // paymailOptions holds the configuration for Paymail @@ -173,7 +173,7 @@ func NewClient(ctx context.Context, opts ...ClientOps) (ClientInterface, error) } // Load the Notification client (if client does not exist) - if err = client.loadNotificationClient(); err != nil { + if err = client.loadNotificationClient(ctx); err != nil { return nil, err } @@ -313,11 +313,6 @@ func (c *Client) Debug(on bool) { if ds := c.Datastore(); ds != nil { ds.Debug(on) } - - // Set debugging on the Notifications - if n := c.Notifications(); n != nil { - n.Debug(on) - } } // DefaultSyncConfig will return the default sync config from the client defaults (for chainstate) @@ -390,16 +385,11 @@ func (c *Client) Logger() *zerolog.Logger { } // Notifications will return the Notifications if it exists -func (c *Client) Notifications() notifications.ClientInterface { - if c.options.notifications != nil && c.options.notifications.ClientInterface != nil { - return c.options.notifications.ClientInterface +func (c *Client) Notifications() *notifications.Notifications { + if c.options.notifications == nil { + return nil } - return nil -} - -// SetNotificationsClient will overwrite the notification's client with the given client -func (c *Client) SetNotificationsClient(client notifications.ClientInterface) { - c.options.notifications.ClientInterface = client + return c.options.notifications.client } // Taskmanager will return the Taskmanager if it exists diff --git a/engine/client_internal.go b/engine/client_internal.go index 7fd7560b..f92c80f7 100644 --- a/engine/client_internal.go +++ b/engine/client_internal.go @@ -9,6 +9,7 @@ import ( "github.com/bitcoin-sv/spv-wallet/engine/cluster" "github.com/bitcoin-sv/spv-wallet/engine/datastore" "github.com/bitcoin-sv/spv-wallet/engine/notifications" + "github.com/bitcoin-sv/spv-wallet/engine/spverrors" "github.com/bitcoin-sv/spv-wallet/engine/taskmanager" "github.com/mrz1836/go-cachestore" ) @@ -82,14 +83,37 @@ func (c *Client) loadDatastore(ctx context.Context) (err error) { } // loadNotificationClient will load the notifications client -func (c *Client) loadNotificationClient() (err error) { - // Load notification if a custom interface was NOT provided - if c.options.notifications.ClientInterface == nil { - c.options.notifications.ClientInterface, err = notifications.NewClient(c.options.notifications.options...) +func (c *Client) loadNotificationClient(ctx context.Context) (err error) { + if c.options.notifications == nil || !c.options.notifications.enabled { + return } + logger := c.Logger().With().Str("subservice", "notification").Logger() + notificationService := notifications.NewNotifications(ctx, &logger) + c.options.notifications.client = notificationService + c.options.notifications.webhookManager = notifications.NewWebhookManager(ctx, &logger, notificationService, &WebhooksRepository{client: c}) return } +func (c *Client) SubscribeWebhook(ctx context.Context, url, tokenHeader, token string) error { + if c.options.notifications == nil || c.options.notifications.webhookManager == nil { + return spverrors.ErrNotificationsDisabled + } + + err := c.options.notifications.webhookManager.Subscribe(ctx, url, tokenHeader, token) + if err != nil { + return spverrors.ErrWebhookSubscriptionFailed + } + return nil +} + +func (c *Client) UnsubscribeWebhook(ctx context.Context, url string) error { + if c.options.notifications == nil || c.options.notifications.webhookManager == nil { + return spverrors.ErrNotificationsDisabled + } + + return c.options.notifications.webhookManager.Unsubscribe(ctx, url) +} + // loadPaymailClient will load the Paymail client func (c *Client) loadPaymailClient() (err error) { // Only load if it's not set (the client can be overloaded) diff --git a/engine/client_options.go b/engine/client_options.go index be421bff..d56e9f93 100644 --- a/engine/client_options.go +++ b/engine/client_options.go @@ -15,7 +15,8 @@ import ( "github.com/bitcoin-sv/spv-wallet/engine/datastore" "github.com/bitcoin-sv/spv-wallet/engine/logging" "github.com/bitcoin-sv/spv-wallet/engine/metrics" - "github.com/bitcoin-sv/spv-wallet/engine/notifications" + + // "github.com/bitcoin-sv/spv-wallet/engine/notifications" "github.com/bitcoin-sv/spv-wallet/engine/taskmanager" "github.com/bitcoin-sv/spv-wallet/engine/utils" "github.com/coocood/freecache" @@ -85,12 +86,6 @@ func defaultClientOptions() *clientOptions { // Blank NewRelic config newRelic: &newRelicOptions{}, - // Blank notifications config - notifications: ¬ificationsOptions{ - ClientInterface: nil, - webhookEndpoint: "", - }, - // Blank Paymail config paymail: &paymailOptions{ client: nil, @@ -211,7 +206,6 @@ func WithDebugging() ClientOps { c.cacheStore.options = append(c.cacheStore.options, cachestore.WithDebugging()) c.chainstate.options = append(c.chainstate.options, chainstate.WithDebugging()) c.dataStore.options = append(c.dataStore.options, datastore.WithDebugging()) - c.notifications.options = append(c.notifications.options, notifications.WithDebugging()) } } @@ -260,10 +254,8 @@ func WithLogger(customLogger *zerolog.Logger) ClientOps { // Enable the logger on all SPV Wallet Engine services chainstateLogger := customLogger.With().Str("subservice", "chainstate").Logger() taskManagerLogger := customLogger.With().Str("subservice", "taskManager").Logger() - notificationsLogger := customLogger.With().Str("subservice", "notifications").Logger() c.chainstate.options = append(c.chainstate.options, chainstate.WithLogger(&chainstateLogger)) c.taskManager.options = append(c.taskManager.options, taskmanager.WithLogger(&taskManagerLogger)) - c.notifications.options = append(c.notifications.options, notifications.WithLogger(¬ificationsLogger)) // Enable the logger on all external services var datastoreLogger *logging.GormLoggerAdapter @@ -607,19 +599,10 @@ func WithExcludedProviders(providers []string) ClientOps { // ----------------------------------------------------------------- // WithNotifications will set the notifications config -func WithNotifications(webhookEndpoint string) ClientOps { - return func(c *clientOptions) { - if len(webhookEndpoint) > 0 { - c.notifications.webhookEndpoint = webhookEndpoint - } - } -} - -// WithCustomNotifications will set a custom notifications interface -func WithCustomNotifications(customNotifications notifications.ClientInterface) ClientOps { +func WithNotifications() ClientOps { return func(c *clientOptions) { - if customNotifications != nil { - c.notifications.ClientInterface = customNotifications + c.notifications = ¬ificationsOptions{ + enabled: true, } } } diff --git a/engine/client_options_test.go b/engine/client_options_test.go index b277e449..5a719918 100644 --- a/engine/client_options_test.go +++ b/engine/client_options_test.go @@ -600,7 +600,7 @@ func TestWithModels(t *testing.T) { ModelXPub.String(), ModelAccessKey.String(), ModelDraftTransaction.String(), ModelTransaction.String(), ModelSyncTransaction.String(), ModelDestination.String(), - ModelUtxo.String(), ModelContact.String(), + ModelUtxo.String(), ModelContact.String(), ModelWebhook.String(), }, tc.GetModelNames()) }) @@ -618,7 +618,7 @@ func TestWithModels(t *testing.T) { ModelXPub.String(), ModelAccessKey.String(), ModelDraftTransaction.String(), ModelTransaction.String(), ModelSyncTransaction.String(), ModelDestination.String(), - ModelUtxo.String(), ModelContact.String(), ModelPaymailAddress.String(), + ModelUtxo.String(), ModelContact.String(), ModelWebhook.String(), ModelPaymailAddress.String(), }, tc.GetModelNames()) }) } @@ -813,6 +813,7 @@ func TestWithAutoMigrate(t *testing.T) { ModelDestination.String(), ModelUtxo.String(), ModelContact.String(), + ModelWebhook.String(), }, tc.GetModelNames()) }) @@ -835,6 +836,7 @@ func TestWithAutoMigrate(t *testing.T) { ModelDestination.String(), ModelUtxo.String(), ModelContact.String(), + ModelWebhook.String(), ModelPaymailAddress.String(), }, tc.GetModelNames()) }) diff --git a/engine/client_test.go b/engine/client_test.go index 3114d14f..136dd175 100644 --- a/engine/client_test.go +++ b/engine/client_test.go @@ -35,7 +35,6 @@ func TestClient_Debug(t *testing.T) { assert.Equal(t, true, tc.IsDebug()) assert.Equal(t, true, tc.Cachestore().IsDebug()) assert.Equal(t, true, tc.Datastore().IsDebug()) - assert.Equal(t, true, tc.Notifications().IsDebug()) }) } diff --git a/engine/db_model_transactions.go b/engine/db_model_transactions.go index d4ea2e51..c1ee4001 100644 --- a/engine/db_model_transactions.go +++ b/engine/db_model_transactions.go @@ -6,6 +6,7 @@ import ( "github.com/bitcoin-sv/spv-wallet/engine/datastore" "github.com/bitcoin-sv/spv-wallet/engine/notifications" "github.com/bitcoin-sv/spv-wallet/engine/spverrors" + "github.com/bitcoin-sv/spv-wallet/models" ) // GetModelTableName will get the db table name of the current model @@ -106,8 +107,7 @@ func (m *Transaction) AfterCreated(ctx context.Context) error { } } - // Fire notifications (this is already in a go routine) - notify(notifications.EventTypeCreate, m) + m.notify() m.Client().Logger().Debug(). Str("txID", m.ID). @@ -121,8 +121,7 @@ func (m *Transaction) AfterUpdated(_ context.Context) error { Str("txID", m.ID). Msgf("starting: %s AfterUpdated hook...", m.Name()) - // Fire notifications (this is already in a go routine) - notify(notifications.EventTypeUpdate, m) + m.notify() m.Client().Logger().Debug(). Str("txID", m.ID). @@ -134,9 +133,6 @@ func (m *Transaction) AfterUpdated(_ context.Context) error { func (m *Transaction) AfterDeleted(_ context.Context) error { m.Client().Logger().Debug().Msgf("starting: %s AfterDeleted hook...", m.Name()) - // Fire notifications (this is already in a go routine) - notify(notifications.EventTypeDelete, m) - m.Client().Logger().Debug().Msgf("end: %s AfterDeleted hook", m.Name()) return nil } @@ -183,3 +179,15 @@ func (m *Transaction) migratePostgreSQL(client datastore.ClientInterface, tableN return nil } + +func (m *Transaction) notify() { + if n := m.Client().Notifications(); n != nil { + notifications.Notify(n, &models.TransactionEvent{ + UserEvent: models.UserEvent{ + XPubID: m.XPubID, + }, + TransactionID: m.ID, + Status: m.TxStatus, + }) + } +} diff --git a/engine/definitions.go b/engine/definitions.go index a085bf8e..325f00c6 100644 --- a/engine/definitions.go +++ b/engine/definitions.go @@ -35,6 +35,7 @@ const ( ModelUtxo ModelName = "utxo" ModelXPub ModelName = "xpub" ModelContact ModelName = "contact" + ModelWebhook ModelName = "webhook" ) // AllModelNames is a list of all models @@ -49,6 +50,7 @@ var AllModelNames = []ModelName{ ModelUtxo, ModelXPub, ModelContact, + ModelWebhook, } // Internal table names @@ -62,6 +64,7 @@ const ( tableUTXOs = "utxos" tableXPubs = "xpubs" tableContacts = "contacts" + tableWebhooks = "webhooks" ) const ( @@ -173,6 +176,10 @@ var BaseModels = []interface{}{ Model: *NewBaseModel(ModelContact), }, + &Webhook{ + Model: *NewBaseModel(ModelWebhook), + }, + // Paymail addresses related to XPubs (automatically added when paymail is enabled) /*&PaymailAddress{ Model: *NewBaseModel(ModelPaymailAddress), diff --git a/engine/interface.go b/engine/interface.go index 2baaf8af..c47f17fd 100644 --- a/engine/interface.go +++ b/engine/interface.go @@ -11,6 +11,8 @@ import ( "github.com/bitcoin-sv/spv-wallet/engine/datastore" "github.com/bitcoin-sv/spv-wallet/engine/metrics" "github.com/bitcoin-sv/spv-wallet/engine/notifications" + + // "github.com/bitcoin-sv/spv-wallet/engine/notifications" "github.com/bitcoin-sv/spv-wallet/engine/taskmanager" "github.com/mrz1836/go-cachestore" "github.com/rs/zerolog" @@ -52,7 +54,7 @@ type ClientService interface { Datastore() datastore.ClientInterface HTTPClient() HTTPInterface Logger() *zerolog.Logger - Notifications() notifications.ClientInterface + Notifications() *notifications.Notifications PaymailClient() paymail.ClientInterface Taskmanager() taskmanager.TaskEngine } @@ -203,8 +205,9 @@ type ClientInterface interface { IsIUCEnabled() bool IsMigrationEnabled() bool IsNewRelicEnabled() bool - SetNotificationsClient(notifications.ClientInterface) UserAgent() string Version() string Metrics() (metrics *metrics.Metrics, enabled bool) + SubscribeWebhook(ctx context.Context, url, tokenHeader, token string) error + UnsubscribeWebhook(ctx context.Context, url string) error } diff --git a/engine/model_destinations.go b/engine/model_destinations.go index 73304902..f28abe2b 100644 --- a/engine/model_destinations.go +++ b/engine/model_destinations.go @@ -7,7 +7,6 @@ import ( "github.com/bitcoin-sv/spv-wallet/engine/cluster" "github.com/bitcoin-sv/spv-wallet/engine/datastore" - "github.com/bitcoin-sv/spv-wallet/engine/notifications" "github.com/bitcoin-sv/spv-wallet/engine/spverrors" "github.com/bitcoin-sv/spv-wallet/engine/utils" "github.com/bitcoinschema/go-bitcoin/v2" @@ -368,8 +367,6 @@ func (m *Destination) AfterCreated(ctx context.Context) error { return err } - notify(notifications.EventTypeCreate, m) - m.Client().Logger().Debug(). Str("destinationID", m.ID). Msgf("end: %s AfterCreated hook", m.Name()) @@ -419,8 +416,6 @@ func (m *Destination) AfterUpdated(ctx context.Context) error { return err } - notify(notifications.EventTypeUpdate, m) - m.Client().Logger().Debug(). Str("destinationID", m.ID). Msgf("end: %s AfterUpdated hook", m.Name()) @@ -450,8 +445,6 @@ func (m *Destination) AfterDeleted(ctx context.Context) error { } } - notify(notifications.EventTypeDelete, m) - m.Client().Logger().Debug(). Str("destinationID", m.ID). Msgf("end: %s AfterDeleted hook", m.Name()) diff --git a/engine/model_webhook.go b/engine/model_webhook.go new file mode 100644 index 00000000..be63379d --- /dev/null +++ b/engine/model_webhook.go @@ -0,0 +1,195 @@ +package engine + +import ( + "context" + "time" + + "github.com/bitcoin-sv/spv-wallet/engine/datastore" + customTypes "github.com/bitcoin-sv/spv-wallet/engine/datastore/customtypes" + "github.com/bitcoin-sv/spv-wallet/engine/notifications" + "github.com/pkg/errors" +) + +// Webhook stores information about subscriptions to notifications via webhooks +// +// Gorm related models & indexes: https://gorm.io/docs/models.html - https://gorm.io/docs/indexes.html +type Webhook struct { + // Base model + Model `bson:",inline"` + + URL string `json:"url" toml:"url" yaml:"url" gorm:"<-create;primaryKey;comment:This is the url on which notifications will be sent" bson:"url"` + TokenHeader string `json:"token_header" toml:"token_header" yaml:"token_header" gorm:"<-create;comment:This is optional token header to be sent" bson:"token_header"` + Token string `json:"token" toml:"token" yaml:"token" gorm:"<-create;comment:This is optional token to be sent" bson:"token"` + BannedTo customTypes.NullTime `json:"banned_to" toml:"banned_to" yaml:"banned_to" gorm:"comment:The time until the webhook will be banned" bson:"banned_to"` +} + +func newWebhook(url, tokenHeader, token string, opts ...ModelOps) *Webhook { + return &Webhook{ + Model: *NewBaseModel(ModelWebhook, opts...), + URL: url, + TokenHeader: tokenHeader, + Token: token, + } +} + +func getWebhooks(ctx context.Context, conditions map[string]any, opts ...ModelOps) ([]*Webhook, error) { + modelItems := make([]*Webhook, 0) + if err := getModelsByConditions(ctx, ModelAccessKey, &modelItems, nil, conditions, nil, opts...); err != nil { + return nil, err + } + + return modelItems, nil +} + +// GetModelName will get the name of the current model +func (m *Webhook) GetModelName() string { + return ModelWebhook.String() +} + +// GetModelTableName will get the db table name of the current model +func (m *Webhook) GetModelTableName() string { + return tableWebhooks +} + +// Save will save the model into the Datastore +func (m *Webhook) Save(ctx context.Context) error { + return Save(ctx, m) +} + +// GetID will get the ID +func (m *Webhook) GetID() string { + return m.URL +} + +// BeforeCreating will fire before the model is being inserted into the Datastore +func (m *Webhook) BeforeCreating(_ context.Context) error { + return nil +} + +// Migrate model specific migration on startup +func (m *Webhook) Migrate(client datastore.ClientInterface) error { + return client.IndexMetadata(client.GetTableName(tableAccessKeys), metadataField) +} + +func (m *Webhook) delete() { + m.DeletedAt.Valid = true + m.DeletedAt.Time = time.Now() +} + +// Banned returns true if the webhook is banned right now +func (m *Webhook) Banned() bool { + if m.BannedTo.Valid == false { + return false + } + ret := !time.Now().After(m.BannedTo.Time) + return ret +} + +// GetURL returns the URL of the webhook +func (m *Webhook) GetURL() string { + return m.URL +} + +// GetTokenHeader returns the token header of the webhook +func (m *Webhook) GetTokenHeader() string { + return m.TokenHeader +} + +// GetTokenValue returns the token value of the webhook +func (m *Webhook) GetTokenValue() string { + return m.Token +} + +// BanUntil sets BannedTo field to the given time +func (m *Webhook) BanUntil(bannedTo time.Time) { + m.BannedTo.Valid = true + m.BannedTo.Time = bannedTo +} + +// Refresh sets the DeletedAt and BannedTo fields to the zero value and updates the token header and value +func (m *Webhook) Refresh(tokenHeader, tokenValue string) { + m.DeletedAt.Valid = false + m.BannedTo.Valid = false + m.TokenHeader = tokenHeader + m.Token = tokenValue +} + +// Deleted returns true if the webhook is deleted +func (m *Webhook) Deleted() bool { + return m.DeletedAt.Valid == true +} + +// WebhooksRepository is the repository for webhooks. It implements the WebhooksRepository interface +type WebhooksRepository struct { + client *Client +} + +// Create makes a new webhook instance and saves it to the database, it will fail if the webhook already exists in the database +func (wr *WebhooksRepository) Create(ctx context.Context, url, tokenHeader, tokenValue string) error { + opts := append(wr.client.DefaultModelOptions(), New()) + model := newWebhook(url, tokenHeader, tokenValue, opts...) + return model.Save(ctx) +} + +// Save stores a model in the database +func (wr *WebhooksRepository) Save(ctx context.Context, model notifications.ModelWebhook) error { + webhook, ok := model.(*Webhook) + if !ok { + return errors.New("Unknown implementation of notifications.ModelWebhook") + } + err := webhook.Save(ctx) + if err != nil { + return errors.Wrap(err, "Cannot save the ModelWebhook") + } + return nil +} + +// Save stores a model in the database +func (wr *WebhooksRepository) Delete(ctx context.Context, model notifications.ModelWebhook) error { + webhook, ok := model.(*Webhook) + if !ok { + return errors.New("Unknown implementation of notifications.ModelWebhook") + } + webhook.delete() + err := webhook.Save(ctx) + if err != nil { + return errors.Wrap(err, "Cannot save the ModelWebhook") + } + return nil +} + +// GetByURL gets a webhook by its URL. If the webhook does not exist, it returns a nil pointer and no error +func (wr *WebhooksRepository) GetByURL(ctx context.Context, url string) (notifications.ModelWebhook, error) { + conditions := map[string]any{ + "url": url, + } + + webhook := &Webhook{} + webhook.enrich(ModelWebhook, wr.client.DefaultModelOptions()...) + + if err := Get(ctx, webhook, conditions, false, defaultDatabaseReadTimeout, false); err != nil { + if errors.Is(err, datastore.ErrNoResults) { + return nil, nil + } + return nil, err + } + + return webhook, nil +} + +// GetAll gets all webhooks from the database +func (wr *WebhooksRepository) GetAll(ctx context.Context) ([]notifications.ModelWebhook, error) { + conditions := map[string]any{ + deletedAtField: nil, + } + list, err := getWebhooks(ctx, conditions, wr.client.DefaultModelOptions()...) + if err != nil { + return nil, err + } + // map to slice of ModelWebhook + res := make([]notifications.ModelWebhook, len(list)) + for i, elem := range list { + res[i] = elem + } + return res, nil +} diff --git a/engine/models_internal.go b/engine/models_internal.go index 08cae7bf..e333195d 100644 --- a/engine/models_internal.go +++ b/engine/models_internal.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/bitcoin-sv/spv-wallet/engine/notifications" "github.com/bitcoin-sv/spv-wallet/engine/spverrors" ) @@ -158,25 +157,6 @@ func incrementField(ctx context.Context, model ModelInterface, fieldName string, return newValue, nil } -// notify about an event on the model -func notify(eventType notifications.EventType, model interface{}) { - // run the notifications in a separate goroutine since there could be significant network delay - // communicating with a notification provider - - go func() { - m := model.(ModelInterface) - if client := m.Client(); client != nil { - if n := client.Notifications(); n != nil { - if err := n.Notify( - context.Background(), m.GetModelName(), eventType, model, m.GetID(), - ); err != nil { - client.Logger().Error().Msgf("failed notifying about %s on %s: %s", string(eventType), m.GetID(), err.Error()) - } - } - } - }() -} - /* // setFieldValueByJSONTag will parse the struct looking for the field (json tag) and updating the value if found // diff --git a/engine/models_test.go b/engine/models_test.go index 00b766d0..bd315212 100644 --- a/engine/models_test.go +++ b/engine/models_test.go @@ -30,7 +30,8 @@ func TestModelName_String(t *testing.T) { assert.Equal(t, "utxo", ModelUtxo.String()) assert.Equal(t, "xpub", ModelXPub.String()) assert.Equal(t, "contact", ModelContact.String()) - assert.Len(t, AllModelNames, 10) + assert.Equal(t, "webhook", ModelWebhook.String()) + assert.Len(t, AllModelNames, 11) }) } diff --git a/engine/notifications/client.go b/engine/notifications/client.go deleted file mode 100644 index 2086fb4b..00000000 --- a/engine/notifications/client.go +++ /dev/null @@ -1,80 +0,0 @@ -package notifications - -import ( - "github.com/bitcoin-sv/spv-wallet/engine/logging" - "github.com/rs/zerolog" -) - -// EventType event types thrown in SPV Wallet Engine -type EventType string - -const ( - // EventTypeCreate when a new model is created - EventTypeCreate EventType = "create" - - // EventTypeUpdate when a new model is updated - EventTypeUpdate EventType = "update" - - // EventTypeDelete when a new model is deleted - EventTypeDelete EventType = "delete" - - // EventTypeBroadcast when a transaction is broadcasted (sync tx) - EventTypeBroadcast EventType = "broadcast" -) - -type ( - - // Client is the client (configuration) - Client struct { - options *clientOptions - } - - // clientOptions holds all the configuration for the client - clientOptions struct { - config *notificationsConfig // Configuration for broadcasting and other chain-state actions - debug bool // Debugging mode - httpClient HTTPInterface // Custom HTTP client - logger *zerolog.Logger // Custom logger interface - } - - // syncConfig holds all the configuration about the different notifications - notificationsConfig struct { - webhookEndpoint string // Webhook URL for basic notifications - } -) - -// NewClient creates a new client for notifications -func NewClient(opts ...ClientOps) (ClientInterface, error) { - // Create a new client with defaults - client := &Client{ - options: defaultClientOptions(), - } - - // Overwrite defaults with any set by user - for _, opt := range opts { - opt(client.options) - } - - // Set logger if not set - if client.options.logger == nil { - client.options.logger = logging.GetDefaultLogger() - } - - // Return the client - return client, nil -} - -// IsDebug will return if debugging is enabled -func (c *Client) IsDebug() bool { - return c.options.debug -} - -// Debug will set the debug flag -func (c *Client) Debug(on bool) { - c.options.debug = on -} - -// Logger get the logger -func (c *Client) Logger() *zerolog.Logger { - return c.options.logger -} diff --git a/engine/notifications/client_options.go b/engine/notifications/client_options.go deleted file mode 100644 index 6336aaf9..00000000 --- a/engine/notifications/client_options.go +++ /dev/null @@ -1,54 +0,0 @@ -package notifications - -import ( - "net/http" - "time" - - "github.com/rs/zerolog" -) - -const ( - defaultHTTPTimeout = 20 * time.Second -) - -// ClientOps allow functional options to be supplied -// that overwrite default client options. -type ClientOps func(c *clientOptions) - -// defaultClientOptions will return an clientOptions struct with the default settings -// -// Useful for starting with the default and then modifying as needed -func defaultClientOptions() *clientOptions { - - // Set the default options - return &clientOptions{ - config: ¬ificationsConfig{ - webhookEndpoint: "", - }, - logger: nil, - httpClient: &http.Client{ - Timeout: defaultHTTPTimeout, - }, - } -} - -// WithNotifications will set the webhook endpoint -func WithNotifications(webhookEndpoint string) ClientOps { - return func(c *clientOptions) { - c.config.webhookEndpoint = webhookEndpoint - } -} - -// WithLogger will set the logger -func WithLogger(customLogger *zerolog.Logger) ClientOps { - return func(c *clientOptions) { - c.logger = customLogger - } -} - -// WithDebugging will set debugging on notifications -func WithDebugging() ClientOps { - return func(c *clientOptions) { - c.debug = true - } -} diff --git a/engine/notifications/client_options_test.go b/engine/notifications/client_options_test.go deleted file mode 100644 index b63188c3..00000000 --- a/engine/notifications/client_options_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package notifications - -import ( - "net/http" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestWithNotifications(t *testing.T) { - type args struct { - webhookEndpoint string - } - tests := []struct { - name string - args args - want string - }{ - { - name: "empty", - args: args{webhookEndpoint: ""}, - want: "", - }, - { - name: "empty", - args: args{webhookEndpoint: "https://example.com/v1"}, - want: "https://example.com/v1", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - opts := []ClientOps{WithNotifications(tt.args.webhookEndpoint)} - client, err := NewClient(opts...) - assert.NoError(t, err) - assert.Equal(t, tt.want, client.GetWebhookEndpoint()) - }) - } -} - -func Test_defaultClientOptions(t *testing.T) { - tests := []struct { - name string - want *clientOptions - }{ - { - name: "options", - want: &clientOptions{ - config: ¬ificationsConfig{ - webhookEndpoint: "", - }, - httpClient: &http.Client{ - Timeout: defaultHTTPTimeout, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - assert.Equalf(t, tt.want, defaultClientOptions(), "defaultClientOptions()") - }) - } -} diff --git a/engine/notifications/client_test.go b/engine/notifications/client_test.go deleted file mode 100644 index a3bc21c4..00000000 --- a/engine/notifications/client_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package notifications - -/*func TestNewClient(t *testing.T) { - type args struct { - opts []ClientOps - } - tests := []struct { - name string - args args - want ClientInterface - wantErr assert.ErrorAssertionFunc - }{ - { - name: "empty", - args: args{opts: []ClientOps{}}, - want: &Client{options: defaultClientOptions()}, - wantErr: assert.NoError, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := NewClient(tt.args.opts...) - if !tt.wantErr(t, err, fmt.Sprintf("NewClient(%v)", tt.args.opts)) { - return - } - assert.Equalf(t, tt.want, got, "NewClient(%v)", tt.args.opts) - }) - } -}*/ diff --git a/engine/notifications/event_utils.go b/engine/notifications/event_utils.go new file mode 100644 index 00000000..46d6bafe --- /dev/null +++ b/engine/notifications/event_utils.go @@ -0,0 +1,56 @@ +package notifications + +import ( + "encoding/json" + "fmt" + "reflect" + + "github.com/bitcoin-sv/spv-wallet/models" + + "github.com/pkg/errors" +) + +// InstantinateEvent creates a new instance of the event type passed as a type parameter. +func InstantinateEvent[EventType models.Events]() *EventType { + base := *new(EventType) + return &base +} + +// GetEventNameByType returns the name of the event type passed as a type parameter. +func GetEventNameByType[EventType models.Events]() string { + content := InstantinateEvent[EventType]() + return reflect.TypeOf(content).Elem().Name() +} + +// GetEventName returns the name of the event type passed as a parameter. +func GetEventName[EventType models.Events](instance *EventType) string { + return reflect.TypeOf(instance).Elem().Name() +} + +// GetEventContent returns the content of the raw event passed as a parameter. +func GetEventContent[EventType models.Events](raw *models.RawEvent) (*EventType, error) { + model := InstantinateEvent[EventType]() + if raw.Type != GetEventName(model) { + return nil, fmt.Errorf("Wrong type") + } + + if err := json.Unmarshal(raw.Content, &model); err != nil { + return nil, errors.Wrap(err, "Cannot unmarshall the content json") + } + return model, nil +} + +// NewRawEvent creates a new raw event from actual event object. +func NewRawEvent[EventType models.Events](namedEvent *EventType) *models.RawEvent { + asJSON, _ := json.Marshal(namedEvent) + return &models.RawEvent{ + Type: GetEventName(namedEvent), + Content: asJSON, + } +} + +// Notify is a utility generc function which allows to push a new event to the notification system. +func Notify[EventType models.Events](n *Notifications, event *EventType) { + rawEvent := NewRawEvent(event) + n.Notify(rawEvent) +} diff --git a/engine/notifications/event_utils_test.go b/engine/notifications/event_utils_test.go new file mode 100644 index 00000000..2cd25992 --- /dev/null +++ b/engine/notifications/event_utils_test.go @@ -0,0 +1,32 @@ +package notifications + +import ( + "encoding/json" + "testing" + + "github.com/bitcoin-sv/spv-wallet/models" + "github.com/stretchr/testify/assert" +) + +func TestEventParsing(t *testing.T) { + t.Run("parse the raw event to actual event type", func(t *testing.T) { + source := NewRawEvent(&models.StringEvent{ + Value: "1", + }) + asJSON, _ := json.Marshal(source) + + var target models.RawEvent + _ = json.Unmarshal(asJSON, &target) + assert.Equal(t, source.Type, target.Type) + + actualEvent, err := GetEventContent[models.StringEvent](&target) + assert.NoError(t, err) + assert.Equal(t, "1", actualEvent.Value) + }) + + t.Run("event name", func(t *testing.T) { + assert.Equal(t, "StringEvent", GetEventNameByType[models.StringEvent]()) + var numericEventInstance *models.StringEvent + assert.Equal(t, "StringEvent", GetEventName(numericEventInstance)) + }) +} diff --git a/engine/notifications/interface.go b/engine/notifications/interface.go index cf59fb22..3694b3d9 100644 --- a/engine/notifications/interface.go +++ b/engine/notifications/interface.go @@ -2,21 +2,25 @@ package notifications import ( "context" - "net/http" - - "github.com/rs/zerolog" + "time" ) -// HTTPInterface is the HTTP client interface -type HTTPInterface interface { - Do(req *http.Request) (*http.Response, error) +// ModelWebhook is an interface for a webhook model. +type ModelWebhook interface { + GetURL() string + GetTokenHeader() string + GetTokenValue() string + BanUntil(bannedTo time.Time) + Refresh(tokenHeader, tokenValue string) + Banned() bool + Deleted() bool } -// ClientInterface is the notification client interface -type ClientInterface interface { - Debug(on bool) - GetWebhookEndpoint() string - IsDebug() bool - Logger() *zerolog.Logger - Notify(ctx context.Context, modelType string, eventType EventType, model interface{}, id string) error +// WebhooksRepository is an interface for managing webhooks. +type WebhooksRepository interface { + Create(ctx context.Context, url, tokenHeader, tokenValue string) error + Save(ctx context.Context, model ModelWebhook) error + Delete(ctx context.Context, model ModelWebhook) error + GetAll(ctx context.Context) ([]ModelWebhook, error) + GetByURL(ctx context.Context, url string) (ModelWebhook, error) } diff --git a/engine/notifications/mock_sender.go b/engine/notifications/mock_sender.go new file mode 100644 index 00000000..53c58cb3 --- /dev/null +++ b/engine/notifications/mock_sender.go @@ -0,0 +1,26 @@ +package notifications + +import ( + "context" + "time" + + "github.com/bitcoin-sv/spv-wallet/models" +) + +// StartSendingMockEvents - utility function to start sending some events in a predefined interval. It's useful for testing +func StartSendingMockEvents[EventType models.Events](ctx context.Context, notificationService *Notifications, duration time.Duration, prepare func(i int) *EventType) { + i := 0 + ticker := time.NewTicker(duration) + go func() { + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + Notify(notificationService, prepare(i)) + i++ + } + } + }() +} diff --git a/engine/notifications/notifications.go b/engine/notifications/notifications.go index 70d4f9c0..914c7e1a 100644 --- a/engine/notifications/notifications.go +++ b/engine/notifications/notifications.go @@ -1,60 +1,77 @@ -// Package notifications is a basic internal notifications module package notifications import ( - "bytes" "context" - "encoding/json" - "net/http" + "sync" + "time" + + "github.com/bitcoin-sv/spv-wallet/models" + "github.com/rs/zerolog" ) -// GetWebhookEndpoint will get the configured webhook endpoint -func (c *Client) GetWebhookEndpoint() string { - return c.options.config.webhookEndpoint +const lengthOfInputChannel = 100 + +// Notifications - service for sending events to multiple notifiers +type Notifications struct { + inputChannel chan *models.RawEvent + outputChannels *sync.Map //[string, chan *Event] + burstLogger *zerolog.Logger } -// Notify will create a new notification event -func (c *Client) Notify(ctx context.Context, modelType string, eventType EventType, - model interface{}, id string) error { +// AddNotifier - add notifier by key +func (n *Notifications) AddNotifier(key string, ch chan *models.RawEvent) { + n.outputChannels.Store(key, ch) +} - if len(c.options.config.webhookEndpoint) == 0 { - if c.IsDebug() { - c.Logger().Info().Msgf("NOTIFY %s: %s - %v", eventType, id, model) - } - } else { - jsonData, err := json.Marshal(map[string]interface{}{ - "event_type": eventType, - "id": id, - "model": model, - "model_type": modelType, - }) - if err != nil { - return err - } +// RemoveNotifier - remove notifier by key +func (n *Notifications) RemoveNotifier(key string) { + n.outputChannels.Delete(key) +} - var req *http.Request - if req, err = http.NewRequestWithContext(ctx, - http.MethodPost, - c.options.config.webhookEndpoint, - bytes.NewBuffer(jsonData), - ); err != nil { - return err - } +// Notify - send event to all notifiers +func (n *Notifications) Notify(event *models.RawEvent) { + n.inputChannel <- event +} - var response *http.Response - if response, err = c.options.httpClient.Do(req); err != nil { - return err - } - defer func() { - _ = response.Body.Close() - }() - - if response.StatusCode != http.StatusOK { - // todo queue notification for another try ... - c.Logger().Error().Msgf("received invalid response from notification endpoint: %d", - response.StatusCode) +// exchange - exchange events between input and output channels, uses fan-out pattern +func (n *Notifications) exchange(ctx context.Context) { + for { + select { + case event := <-n.inputChannel: + n.outputChannels.Range(func(_, value any) bool { + ch := value.(chan *models.RawEvent) + n.sendEventToChannel(ch, event) + return true + }) + case <-ctx.Done(): + return } } +} + +// sendEventToChannel - non blocking send event to channel +func (n *Notifications) sendEventToChannel(ch chan *models.RawEvent, event *models.RawEvent) { + select { + case ch <- event: + // Successfully sent event + default: + n.burstLogger.Warn().Msg("Failed to send event to channel") + } +} + +// NewNotifications - creates a new instance of Notifications +func NewNotifications(ctx context.Context, parentLogger *zerolog.Logger) *Notifications { + burstLogger := parentLogger.With().Logger().Sample(&zerolog.BurstSampler{ + Burst: 3, + Period: 30 * time.Second, + }) + n := &Notifications{ + inputChannel: make(chan *models.RawEvent, lengthOfInputChannel), + outputChannels: new(sync.Map), + burstLogger: &burstLogger, + } + + go n.exchange(ctx) - return nil + return n } diff --git a/engine/notifications/notifications_test.go b/engine/notifications/notifications_test.go index f7b83ca8..53bc1677 100644 --- a/engine/notifications/notifications_test.go +++ b/engine/notifications/notifications_test.go @@ -2,93 +2,190 @@ package notifications import ( "context" - "errors" "fmt" - "net/http" "testing" + "time" - "github.com/jarcoal/httpmock" + "github.com/bitcoin-sv/spv-wallet/models" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) -func TestClient_Notify(t *testing.T) { - httpmock.Activate() - defer httpmock.DeactivateAndReset() +func newMockEvent(value string) *models.RawEvent { + return NewRawEvent(&models.StringEvent{ + Value: value, + }) +} - ctx := context.Background() - webhookURL := "https://test.example.com/v1/api-endpoint" +type mockNotifier struct { + delay *time.Duration + channel chan *models.RawEvent + output []*models.RawEvent +} - type args struct { - modelType string - eventType EventType - model interface{} - id string +func (m *mockNotifier) consumer(ctx context.Context) { + for { + select { + case event := <-m.channel: + m.output = append(m.output, event) + if m.delay != nil { + sleepWithContext(ctx, *m.delay) + } + case <-ctx.Done(): + return + } } +} - useArgs := args{ - modelType: "transaction", - eventType: EventTypeCreate, - model: map[string]interface{}{}, - id: "test-id", +func (m *mockNotifier) assertOutput(t *testing.T, expected []string) { + assert.Equal(t, len(expected), len(m.output)) + if len(expected) == len(m.output) { + for i := 0; i < len(expected); i++ { + actualEvent, err := GetEventContent[models.StringEvent](m.output[i]) + assert.NoError(t, err) + assert.Equal(t, expected[i], actualEvent.Value) + } } +} - var tests = []struct { - name string - options []ClientOps - args args - wantErr assert.ErrorAssertionFunc - httpCalls int - httpMock func() - }{ - { - name: "empty notification", - options: nil, - args: useArgs, - wantErr: assert.NoError, - httpCalls: 0, - httpMock: func() {}, - }, - { - name: "http call done", - options: []ClientOps{ - WithNotifications(webhookURL), - }, - args: useArgs, - wantErr: assert.NoError, - httpCalls: 1, - httpMock: func() { - httpmock.RegisterResponder(http.MethodPost, webhookURL, - httpmock.NewStringResponder( - http.StatusOK, - `OK`, - ), - ) - }, - }, - { - name: "http error", - options: []ClientOps{ - WithNotifications(webhookURL), - }, - args: useArgs, - wantErr: assert.Error, - httpCalls: 1, - httpMock: func() { - httpmock.RegisterResponder(http.MethodPost, webhookURL, - httpmock.NewErrorResponder(errors.New("error")), - ) - }, - }, +func newMockNotifier(ctx context.Context, chanLength int) *mockNotifier { + notifier := &mockNotifier{ + channel: make(chan *models.RawEvent, chanLength), } - for _, tt := range tests { - httpmock.Reset() - tt.httpMock() - t.Run(tt.name, func(t *testing.T) { - c, err := NewClient(tt.options...) - require.NoError(t, err) - tt.wantErr(t, c.Notify(ctx, tt.args.modelType, tt.args.eventType, tt.args.model, tt.args.id), fmt.Sprintf("Notify(%v, %v, %v, %v, %v)", ctx, tt.args.modelType, tt.args.eventType, tt.args.model, tt.args.id)) - assert.Equal(t, tt.httpCalls, httpmock.GetTotalCallCount()) - }) + + go notifier.consumer(ctx) + return notifier +} + +func sleepWithContext(ctx context.Context, d time.Duration) { + timer := time.NewTimer(d) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + case <-timer.C: } } + +func TestNotifications(t *testing.T) { + t.Run("one notifier", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + notifier := newMockNotifier(ctx, 100) + n.AddNotifier("test", notifier.channel) + + expected := []string{} + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + expected = append(expected, msg) + } + + time.Sleep(100 * time.Millisecond) + cancel() + + notifier.assertOutput(t, expected) + }) + + t.Run("two notifiers", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + notifier1 := newMockNotifier(ctx, 100) + notifier2 := newMockNotifier(ctx, 100) + n.AddNotifier("notifier1", notifier1.channel) + n.AddNotifier("notifier2", notifier2.channel) + + expected := []string{} + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + expected = append(expected, msg) + } + + time.Sleep(100 * time.Millisecond) + cancel() + + notifier1.assertOutput(t, expected) + notifier2.assertOutput(t, expected) + }) + + t.Run("more notifications than output chan buffer", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + outputChanLength := 10 + numberOfEvents := 50 // 50 > 10 + notifier := newMockNotifier(ctx, outputChanLength) + n.AddNotifier("test", notifier.channel) + + expected := []string{} + for i := 0; i < numberOfEvents; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + // we have to delay of putting new events because the output chan buffer will not contain all of the events in its buffer + // so, this way, we let consumer to pop events from the queue + time.Sleep(1 * time.Millisecond) + expected = append(expected, msg) + } + + time.Sleep(500 * time.Millisecond) + cancel() + + notifier.assertOutput(t, expected) + }) + + t.Run("slow and fast consumers", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + outputChanLength := 10 + numberOfEvents := 50 // 50 > 10 + + notifier1 := newMockNotifier(ctx, outputChanLength) + veryLongDelay := 1 * time.Hour // it means that notifier will pop only one event + notifier1.delay = &veryLongDelay + + notifier2 := newMockNotifier(ctx, outputChanLength) + n.AddNotifier("notifier1", notifier1.channel) + n.AddNotifier("notifier2", notifier2.channel) + + expected := []string{} + for i := 0; i < numberOfEvents; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + time.Sleep(1 * time.Millisecond) + expected = append(expected, msg) + } + + time.Sleep(500 * time.Millisecond) + cancel() + + // even though notifier1 is slow, it should not block notifier2 + notifier1.assertOutput(t, []string{"msg-0"}) + notifier2.assertOutput(t, expected) + }) + + t.Run("buffered and unbuffered channels", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + outputChanLength := 10 + numberOfEvents := 50 // 50 > 10 + + notifier1 := newMockNotifier(ctx, 1) + notifier2 := newMockNotifier(ctx, outputChanLength) + n.AddNotifier("notifier1", notifier1.channel) + n.AddNotifier("notifier2", notifier2.channel) + + expected := []string{} + for i := 0; i < numberOfEvents; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + time.Sleep(1 * time.Millisecond) + expected = append(expected, msg) + } + + time.Sleep(500 * time.Millisecond) + cancel() + + notifier1.assertOutput(t, expected) + notifier2.assertOutput(t, expected) + }) +} diff --git a/engine/notifications/webhook_manager.go b/engine/notifications/webhook_manager.go new file mode 100644 index 00000000..f9362b3d --- /dev/null +++ b/engine/notifications/webhook_manager.go @@ -0,0 +1,203 @@ +package notifications + +import ( + "context" + "sync" + "time" + + "github.com/bitcoin-sv/spv-wallet/engine/spverrors" + "github.com/pkg/errors" + "github.com/rs/zerolog" +) + +type notifierWithCtx struct { + notifier *WebhookNotifier + ctx context.Context + cancelFunc context.CancelFunc +} + +// WebhookManager is a manager for webhooks. It is responsible for creating, updating and removing webhooks. +type WebhookManager struct { + repository WebhooksRepository + rootContext context.Context + cancelAllFunc context.CancelFunc + webhookNotifiers *sync.Map // [string, *notifierWithCtx] + ticker *time.Ticker + updateMsg chan bool + banMsg chan string // url + notifications *Notifications + logger *zerolog.Logger +} + +// NewWebhookManager creates a new WebhookManager. It starts a goroutine which checks for webhook updates. +func NewWebhookManager(ctx context.Context, logger *zerolog.Logger, notifications *Notifications, repository WebhooksRepository) *WebhookManager { + rootContext, cancelAllFunc := context.WithCancel(ctx) + manager := WebhookManager{ + repository: repository, + rootContext: rootContext, + cancelAllFunc: cancelAllFunc, + webhookNotifiers: &sync.Map{}, + ticker: time.NewTicker(5 * time.Second), + notifications: notifications, + updateMsg: make(chan bool), + banMsg: make(chan string), + logger: logger, + } + + go manager.checkForUpdates() + + return &manager +} + +// Stop stops the WebhookManager. +func (w *WebhookManager) Stop() { + w.cancelAllFunc() +} + +// Subscribe subscribes to a webhook. It adds the webhook to the database and starts a notifier for it. +func (w *WebhookManager) Subscribe(ctx context.Context, url, tokenHeader, tokenValue string) error { + found, err := w.repository.GetByURL(ctx, url) + if err != nil { + return errors.Wrap(err, "Failed to check existing webhook in database") + } + if found != nil { + found.Refresh(tokenHeader, tokenValue) + err = w.repository.Save(ctx, found) + } else { + err = w.repository.Create(ctx, url, tokenHeader, tokenValue) + } + + if err != nil { + return errors.Wrap(err, "Failed to store the webhook") + } + + w.updateMsg <- true + return nil +} + +// Unsubscribe unsubscribes from a webhook. It removes the webhook from the database and stops the notifier for it. +func (w *WebhookManager) Unsubscribe(ctx context.Context, url string) error { + model, err := w.repository.GetByURL(ctx, url) + if err != nil || model == nil || model.Deleted() { + return spverrors.ErrWebhookSubscriptionNotFound + } + err = w.repository.Delete(ctx, model) + if err != nil { + return spverrors.ErrWebhookUnsubscriptionFailed + } + w.updateMsg <- true + return nil +} + +func (w *WebhookManager) checkForUpdates() { + defer func() { + w.logger.Info().Msg("WebhookManager stopped") + if err := recover(); err != nil { + w.logger.Warn().Msgf("WebhookManager failed: %v", err) + } + }() + + w.logger.Info().Msg("WebhookManager started") + w.update() + + for { + select { + case <-w.ticker.C: + w.update() + case <-w.updateMsg: + w.update() + case url := <-w.banMsg: + err := w.markWebhookAsBanned(w.rootContext, url) + if err != nil { + w.logger.Warn().Msgf("failed to mark a webhook as banned: %v", err) + } + w.removeNotifier(url) + case <-w.rootContext.Done(): + return + } + } +} + +func (w *WebhookManager) update() { + defer func() { + if err := recover(); err != nil { + w.logger.Warn().Msgf("WebhookManager update failed: %v", err) + } + }() + dbWebhooks, err := w.repository.GetAll(w.rootContext) + if err != nil { + w.logger.Warn().Msgf("failed to get webhooks: %v", err) + return + } + + // filter out banned webhooks + var filteredWebhooks []ModelWebhook + for _, webhook := range dbWebhooks { + if !webhook.Banned() { + filteredWebhooks = append(filteredWebhooks, webhook) + } + } + + // add notifiers which are not in the map + for _, model := range filteredWebhooks { + if _, ok := w.webhookNotifiers.Load(model.GetURL()); !ok { + w.addNotifier(model) + } + } + + // remove notifiers which are not in the database + w.webhookNotifiers.Range(func(key, _ any) bool { + url := key.(string) + if !containsWebhook(filteredWebhooks, url) { + w.removeNotifier(url) + } + return true + }) + + // update definition of remained webhooks + for _, model := range filteredWebhooks { + if item, ok := w.webhookNotifiers.Load(model.GetURL()); ok { + item.(*notifierWithCtx).notifier.Update(model) + } + } +} + +func (w *WebhookManager) addNotifier(model ModelWebhook) { + w.logger.Info().Msgf("Add a webhook notifier. URL: %s", model.GetURL()) + ctx, cancel := context.WithCancel(w.rootContext) + notifier := NewWebhookNotifier(ctx, w.logger, model, w.banMsg) + w.webhookNotifiers.Store(model.GetURL(), ¬ifierWithCtx{notifier: notifier, ctx: ctx, cancelFunc: cancel}) + w.notifications.AddNotifier(model.GetURL(), notifier.Channel) +} + +func (w *WebhookManager) removeNotifier(url string) { + if item, ok := w.webhookNotifiers.Load(url); ok { + w.logger.Info().Msgf("Remove a webhook notifier. URL: %s", url) + item := item.(*notifierWithCtx) + item.cancelFunc() + w.webhookNotifiers.Delete(url) + w.notifications.RemoveNotifier(url) + } +} + +func (w *WebhookManager) markWebhookAsBanned(ctx context.Context, url string) error { + model, err := w.repository.GetByURL(ctx, url) + if err != nil { + return errors.Wrap(err, "Cannot find the webhook model") + } + model.BanUntil(time.Now().Add(banTime)) + err = w.repository.Save(ctx, model) + if err != nil { + return errors.Wrap(err, "Cannot update the webhook model") + } + return nil +} + +func containsWebhook(webhooks []ModelWebhook, url string) bool { + for _, webhook := range webhooks { + if webhook.GetURL() == url { + return true + } + } + return false +} diff --git a/engine/notifications/webhook_manager_test.go b/engine/notifications/webhook_manager_test.go new file mode 100644 index 00000000..664df54d --- /dev/null +++ b/engine/notifications/webhook_manager_test.go @@ -0,0 +1,121 @@ +package notifications + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/jarcoal/httpmock" +) + +type mockRepository struct { + webhooks []ModelWebhook +} + +func (r *mockRepository) Create(_ context.Context, url, tokenHeader, tokenValue string) error { + model := newMockWebhookModel(url, tokenHeader, tokenValue) + r.webhooks = append(r.webhooks, model) + return nil +} + +func (r *mockRepository) Save(_ context.Context, model ModelWebhook) error { + for i, w := range r.webhooks { + if w.GetURL() == model.GetURL() { + r.webhooks[i] = model + return nil + } + } + r.webhooks = append(r.webhooks, model) + return nil +} + +func (r *mockRepository) Delete(_ context.Context, model ModelWebhook) error { + for i, w := range r.webhooks { + if w.GetURL() == model.GetURL() { + webhook := r.webhooks[i].(*mockModelWebhook) + webhook.deleted = true + r.webhooks[i] = webhook + return nil + } + } + return nil +} + +func (r *mockRepository) GetAll(_ context.Context) ([]ModelWebhook, error) { + return r.webhooks, nil +} + +func (r *mockRepository) GetByURL(_ context.Context, url string) (ModelWebhook, error) { + for _, w := range r.webhooks { + if w.GetURL() == url { + return w, nil + } + } + return nil, nil +} + +func TestWebhookManager(t *testing.T) { + t.Run("one webhook notifier previously subscribed", func(t *testing.T) { + httpmock.Reset() + httpmock.Activate() + defer httpmock.Deactivate() + + client := newMockClient("http://localhost:8080") + + ctx, cancel := context.WithCancel(context.Background()) + + n := NewNotifications(ctx, &nopLogger) + repo := &mockRepository{webhooks: []ModelWebhook{newMockWebhookModel(client.url, "", "")}} + + manager := NewWebhookManager(ctx, &nopLogger, n, repo) + time.Sleep(100 * time.Millisecond) // wait for manager to update notifiers + defer manager.Stop() + + expected := []string{} + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + expected = append(expected, msg) + } + + time.Sleep(100 * time.Millisecond) + cancel() + + client.assertEvents(t, expected) + client.assertEventsWereSentInBatches(t, true) + }) + + t.Run("one webhook notifier - subscribe", func(t *testing.T) { + httpmock.Reset() + httpmock.Activate() + defer httpmock.Deactivate() + + client := newMockClient("http://localhost:8080") + + ctx, cancel := context.WithCancel(context.Background()) + + n := NewNotifications(ctx, &nopLogger) + repo := &mockRepository{webhooks: []ModelWebhook{newMockWebhookModel(client.url, "", "")}} + + manager := NewWebhookManager(ctx, &nopLogger, n, repo) + time.Sleep(100 * time.Millisecond) + defer manager.Stop() + + manager.Subscribe(ctx, client.url, "", "") + time.Sleep(100 * time.Millisecond) // wait for manager to update notifiers + + expected := []string{} + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + expected = append(expected, msg) + } + + time.Sleep(100 * time.Millisecond) + cancel() + + client.assertEvents(t, expected) + client.assertEventsWereSentInBatches(t, true) + }) +} diff --git a/engine/notifications/webhook_notifier.go b/engine/notifications/webhook_notifier.go new file mode 100644 index 00000000..0c1bc77d --- /dev/null +++ b/engine/notifications/webhook_notifier.go @@ -0,0 +1,151 @@ +package notifications + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/bitcoin-sv/spv-wallet/models" + "github.com/pkg/errors" + "github.com/rs/zerolog" +) + +const ( + maxBatchSize = 100 + mexRetries = 2 + retriesDelay = 1 * time.Second + banTime = 60 * time.Minute + lengthOfWebhookChannel = 100 +) + +// WebhookNotifier - notifier for sending events to webhook +type WebhookNotifier struct { + Channel chan *models.RawEvent + banMsg chan string + httpClient *http.Client + definition ModelWebhook + definitionMtx sync.Mutex + logger *zerolog.Logger +} + +// NewWebhookNotifier - creates a new instance of WebhookNotifier +func NewWebhookNotifier(ctx context.Context, logger *zerolog.Logger, model ModelWebhook, banMsg chan string) *WebhookNotifier { + log := logger.With().Str("subservice", "WebhookNotifier").Str("webhookUrl", model.GetURL()).Logger() + notifier := &WebhookNotifier{ + Channel: make(chan *models.RawEvent, lengthOfWebhookChannel), + definition: model, + banMsg: banMsg, + httpClient: &http.Client{}, + logger: &log, + } + + go notifier.consumer(ctx) + + return notifier +} + +// Update - updates the webhook model +func (w *WebhookNotifier) Update(model ModelWebhook) { + w.definitionMtx.Lock() + defer w.definitionMtx.Unlock() + + w.definition = model +} + +func (w *WebhookNotifier) currentDefinition() ModelWebhook { + w.definitionMtx.Lock() + defer w.definitionMtx.Unlock() + + return w.definition +} + +// consumer - consumer for webhook notifier +// It accumulates events (produced during http call) and sends them to webhook +// If sending fails, it retries several times +// If sending fails after several retries, it bans notifier for some time +func (w *WebhookNotifier) consumer(ctx context.Context) { + for { + select { + case event := <-w.Channel: + events, done := w.accumulateEvents(ctx, event) + if done { + return + } + var err error + for i := 0; i < mexRetries; i++ { + err = w.sendEventsToWebhook(ctx, events) + if err == nil { + break + } + w.logger.Warn().Msgf("Webhook call was failed: %v", err) + select { + case <-ctx.Done(): + return + case <-time.After(retriesDelay): + } + } + + if err != nil { + w.banMsg <- w.currentDefinition().GetURL() + return + } + case <-ctx.Done(): + return + } + } +} + +func (w *WebhookNotifier) accumulateEvents(ctx context.Context, event *models.RawEvent) (events []*models.RawEvent, done bool) { + events = append(events, event) +loop: + for i := 0; i < maxBatchSize; i++ { + select { + case event := <-w.Channel: + events = append(events, event) + case <-ctx.Done(): + return nil, true + default: + break loop + } + } + return events, false +} + +func (w *WebhookNotifier) sendEventsToWebhook(ctx context.Context, events []*models.RawEvent) (resultError error) { + defer func() { + if r := recover(); r != nil { + w.logger.Warn().Msgf("Webhook call failed: %v", r) + resultError = errors.New("panic") + } + }() + definition := w.currentDefinition() + data, err := json.Marshal(events) + if err != nil { + return errors.Wrap(err, "failed to marshal events") + } + + req, err := http.NewRequestWithContext(ctx, "POST", definition.GetURL(), bytes.NewBuffer(data)) + if err != nil { + return errors.Wrap(err, "failed to create request") + } + + req.Header.Set("Content-Type", "application/json") + tokenHeader, tokenValue := definition.GetTokenHeader(), definition.GetTokenValue() + if tokenHeader != "" { + req.Header.Set(tokenHeader, tokenValue) + } + + resp, err := w.httpClient.Do(req) + if err != nil { + return errors.Wrap(err, "failed to send request") + } + + defer func() { + _ = resp.Body.Close() + }() + + return nil +} diff --git a/engine/notifications/webhook_notifier_test.go b/engine/notifications/webhook_notifier_test.go new file mode 100644 index 00000000..d3c553ee --- /dev/null +++ b/engine/notifications/webhook_notifier_test.go @@ -0,0 +1,330 @@ +package notifications + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "testing" + "time" + + "github.com/bitcoin-sv/spv-wallet/models" + "github.com/jarcoal/httpmock" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" +) + +type mockClient struct { + interceptor func(req *http.Request) (*http.Response, error) + url string + receivedBatches [][]*models.RawEvent +} + +var nopLogger = zerolog.Nop() + +func newMockClient(url string) *mockClient { + mc := &mockClient{ + receivedBatches: make([][]*models.RawEvent, 0), + url: url, + } + + customResponder := func(req *http.Request) (*http.Response, error) { + if mc.interceptor != nil { + res, err := mc.interceptor(req) + if res != nil { + return res, err + } + } + + // Read the body from the incoming request + body, err := io.ReadAll(req.Body) + if err != nil { + return httpmock.NewStringResponse(500, ""), err + } + + var events []*models.RawEvent + err = json.Unmarshal(body, &events) + if err != nil { + return httpmock.NewStringResponse(500, ""), err + } + + mc.receivedBatches = append(mc.receivedBatches, events) + + return httpmock.NewStringResponse(200, "OK"), nil + } + + httpmock.RegisterResponder("POST", url, customResponder) + + return mc +} + +func (mc *mockClient) assertEvents(t *testing.T, expected []string) { + flatten := make([]*models.RawEvent, 0) + for _, batch := range mc.receivedBatches { + flatten = append(flatten, batch...) + } + assert.Equal(t, len(expected), len(flatten)) + if len(expected) == len(flatten) { + for i := 0; i < len(expected); i++ { + actualEvent, err := GetEventContent[models.StringEvent](flatten[i]) + assert.NoError(t, err) + assert.Equal(t, expected[i], actualEvent.Value) + } + } +} + +func (mc *mockClient) assertEventsWereSentInBatches(t *testing.T, expected bool) { + result := false + for _, batch := range mc.receivedBatches { + if len(batch) > 1 { + result = true + break + } + } + assert.Equal(t, expected, result) +} + +type mockModelWebhook struct { + BannedTo *time.Time + URL string + TokenHeader string + TokenValue string + deleted bool +} + +func (m *mockModelWebhook) Banned() bool { + return m.BannedTo != nil +} + +func (m *mockModelWebhook) Deleted() bool { + return m.deleted +} + +func (m *mockModelWebhook) GetURL() string { + return m.URL +} + +func (m *mockModelWebhook) GetTokenHeader() string { + return m.TokenHeader +} + +func (m *mockModelWebhook) GetTokenValue() string { + return m.TokenValue +} + +func (m *mockModelWebhook) BanUntil(bannedTo time.Time) { + m.BannedTo = &bannedTo +} + +func (m *mockModelWebhook) Refresh(tokenHeader, tokenValue string) { + m.BannedTo = nil + m.deleted = false + m.TokenHeader = tokenHeader + m.TokenValue = tokenValue +} + +func newMockWebhookModel(url, tokenHeader, tokenValue string) *mockModelWebhook { + return &mockModelWebhook{ + URL: url, + TokenHeader: tokenHeader, + TokenValue: tokenValue, + } +} + +func TestWebhookNotifier(t *testing.T) { + t.Run("one webhook notifier", func(t *testing.T) { + httpmock.Reset() + httpmock.Activate() + defer httpmock.Deactivate() + + client := newMockClient("http://localhost:8080") + + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + notifier := NewWebhookNotifier(ctx, &nopLogger, newMockWebhookModel(client.url, "", ""), make(chan string)) + n.AddNotifier(client.url, notifier.Channel) + + expected := []string{} + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + expected = append(expected, msg) + } + + time.Sleep(100 * time.Millisecond) + cancel() + + client.assertEvents(t, expected) + client.assertEventsWereSentInBatches(t, true) + }) + + t.Run("two webhook notifiers", func(t *testing.T) { + httpmock.Reset() + httpmock.Activate() + defer httpmock.Deactivate() + + client1 := newMockClient("http://localhost:8080") + client2 := newMockClient("http://localhost:8081") + + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + + notifier1 := NewWebhookNotifier(ctx, &nopLogger, newMockWebhookModel(client1.url, "", ""), make(chan string)) + n.AddNotifier(client1.url, notifier1.Channel) + + notifier2 := NewWebhookNotifier(ctx, &nopLogger, newMockWebhookModel(client2.url, "", ""), make(chan string)) + n.AddNotifier(client2.url, notifier2.Channel) + + expected := []string{} + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + expected = append(expected, msg) + } + + time.Sleep(100 * time.Millisecond) + cancel() + + client1.assertEvents(t, expected) + client1.assertEventsWereSentInBatches(t, true) + + client2.assertEvents(t, expected) + client2.assertEventsWereSentInBatches(t, true) + }) + + t.Run("no batches when notifications are put slowly", func(t *testing.T) { + httpmock.Reset() + httpmock.Activate() + defer httpmock.Deactivate() + + client := newMockClient("http://localhost:8080") + + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + notifier := NewWebhookNotifier(ctx, &nopLogger, newMockWebhookModel(client.url, "", ""), make(chan string)) + n.AddNotifier(client.url, notifier.Channel) + + expected := []string{} + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + time.Sleep(100 * time.Microsecond) + expected = append(expected, msg) + } + + time.Sleep(100 * time.Millisecond) + cancel() + + client.assertEvents(t, expected) + client.assertEventsWereSentInBatches(t, false) + }) + + t.Run("with retries", func(t *testing.T) { + httpmock.Reset() + httpmock.Activate() + defer httpmock.Deactivate() + + client := newMockClient("http://localhost:8080") + k := 0 + client.interceptor = func(_ *http.Request) (*http.Response, error) { + if k < 1 { + k++ + return httpmock.NewStringResponse(408, ""), fmt.Errorf("Timeout") + } + return nil, nil + } + + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + notifier := NewWebhookNotifier(ctx, &nopLogger, newMockWebhookModel(client.url, "", ""), make(chan string)) + n.AddNotifier(client.url, notifier.Channel) + + expected := []string{} + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + expected = append(expected, msg) + } + + time.Sleep(1500 * time.Millisecond) + cancel() + + client.assertEvents(t, expected) + }) + + t.Run("ban webhook", func(t *testing.T) { + httpmock.Reset() + httpmock.Activate() + defer httpmock.Deactivate() + + client := newMockClient("http://localhost:8080") + client.interceptor = func(_ *http.Request) (*http.Response, error) { + return httpmock.NewStringResponse(408, ""), fmt.Errorf("Timeout") + } + + banMsg := make(chan string) + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + notifier := NewWebhookNotifier(ctx, &nopLogger, newMockWebhookModel(client.url, "", ""), banMsg) + n.AddNotifier(client.url, notifier.Channel) + + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + } + + banHasBeenTriggered := false + loop: + for { + select { + case <-time.After(3 * time.Second): + break loop + case url := <-banMsg: + if url == client.url { + banHasBeenTriggered = true + } + } + } + cancel() + + assert.Equal(t, true, banHasBeenTriggered) + }) + + t.Run("with token", func(t *testing.T) { + httpmock.Reset() + httpmock.Activate() + defer httpmock.Deactivate() + + tokenHeader := "test" + tokenValue := "token" + + waitForCall := make(chan bool) + client := newMockClient("http://localhost:8080") + allGood := false + client.interceptor = func(req *http.Request) (*http.Response, error) { + defer func() { + waitForCall <- true + }() + if tokenValue == req.Header.Get(tokenHeader) { + allGood = true + } + return nil, nil + } + + ctx, cancel := context.WithCancel(context.Background()) + n := NewNotifications(ctx, &nopLogger) + notifier := NewWebhookNotifier(ctx, &nopLogger, newMockWebhookModel(client.url, tokenHeader, tokenValue), make(chan string)) + n.AddNotifier(client.url, notifier.Channel) + + for i := 0; i < 10; i++ { + msg := fmt.Sprintf("msg-%d", i) + n.Notify(newMockEvent(msg)) + } + + <-waitForCall + cancel() + + assert.Equal(t, true, allGood) + }) +} diff --git a/engine/spverrors/definitions.go b/engine/spverrors/definitions.go index 27f09ec4..a967ec9e 100644 --- a/engine/spverrors/definitions.go +++ b/engine/spverrors/definitions.go @@ -331,3 +331,17 @@ var ErrMissingClient = models.SPVError{Message: "client is missing from model, c // ErrDatastoreRequired is when a datastore function is called without a datastore present var ErrDatastoreRequired = models.SPVError{Message: "datastore is required", StatusCode: 500, Code: "error-datastore-required"} + +//////////////////////////////////// NOTIFICATION ERRORS + +// ErrWebhookSubscriptionFailed is when webhook subscription failed +var ErrWebhookSubscriptionFailed = models.SPVError{Message: "webhook subscription failed", StatusCode: 500, Code: "error-webhook-subscription-failed"} + +// ErrWebhookUnsubscriptionFailed is when webhook unsubscription failed +var ErrWebhookUnsubscriptionFailed = models.SPVError{Message: "webhook unsubscription failed", StatusCode: 500, Code: "error-webhook-unsubscription-failed"} + +// ErrWebhookSubscriptionNotFound is when cannot find webhook to unsubscribe +var ErrWebhookSubscriptionNotFound = models.SPVError{Message: "webhook subscription not found", StatusCode: 404, Code: "error-webhook-subscription-not-found"} + +// ErrNotificationsDisabled happens when the notifications are not enabled in the config +var ErrNotificationsDisabled = models.SPVError{Message: "notifications are disabled", StatusCode: 404, Code: "error-notifications-disabled"} diff --git a/engine/sync_tx_service.go b/engine/sync_tx_service.go index e2cdea56..6e6bbc14 100644 --- a/engine/sync_tx_service.go +++ b/engine/sync_tx_service.go @@ -12,7 +12,6 @@ import ( "github.com/bitcoin-sv/go-paymail" "github.com/bitcoin-sv/spv-wallet/engine/chainstate" "github.com/bitcoin-sv/spv-wallet/engine/datastore" - "github.com/bitcoin-sv/spv-wallet/engine/notifications" "github.com/bitcoin-sv/spv-wallet/engine/spverrors" ) @@ -155,9 +154,6 @@ func broadcastSyncTransaction(ctx context.Context, syncTx *SyncTransaction) erro return err } - // Fire a notification - notify(notifications.EventTypeBroadcast, syncTx) - return nil } diff --git a/models/notifications.go b/models/notifications.go new file mode 100644 index 00000000..76e4234e --- /dev/null +++ b/models/notifications.go @@ -0,0 +1,46 @@ +package models + +import "encoding/json" + +// SubscribeRequestBody represents the request body for the subscribe endpoint. +type SubscribeRequestBody struct { + URL string `json:"url"` + TokenHeader string `json:"tokenHeader"` + TokenValue string `json:"tokenValue"` +} + +// UnsubscribeRequestBody represents the request body for the unsubscribe endpoint. +type UnsubscribeRequestBody struct { + URL string `json:"url"` +} + +// RawEvent - the base event type +type RawEvent struct { + Type string `json:"type"` + Content json.RawMessage `json:"content"` +} + +// StringEvent - event with string value; can be used for generic messages and it's used for testing +type StringEvent struct { + Value string `json:"value"` +} + +// UserEvent - event with user identifier +type UserEvent struct { + XPubID string `json:"xpubId"` +} + +// TransactionEvent - event for transaction changes +type TransactionEvent struct { + UserEvent `json:",inline"` + + TransactionID string `json:"transactionId"` + Status string `json:"status"` +} + +// NOTICE: If you add a new event type, you must also update the Events interface + +// Events - interface for all supported events +type Events interface { + StringEvent | TransactionEvent +}