From 52a869a26a43bd2412760218a2cba3d97f04e8f4 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Thu, 6 Jul 2023 09:47:28 +0200 Subject: [PATCH] feat: clean connectors + add polling for connectors (#417) --- .../payments-v0.7.0-post-upgrade.yaml | 18 +++ .../payments-v0.7.0-pre-upgrade.yaml | 18 +++ .../payments-v0.7.0-post-upgrade.yaml | 18 +++ .../payments-v0.7.0-pre-upgrade.yaml | 18 +++ .../payments-v0.7.0-post-upgrade.yaml | 18 +++ .../payments-v0.7.0-pre-upgrade.yaml | 18 +++ .../payments-v0.7.0-post-upgrade.yaml | 18 +++ .../payments-v0.7.0-pre-upgrade.yaml | 18 +++ .../internal/handlers/handler_payments.go | 52 +++--- .../connectors/bankingcircle/client/auth.go | 69 ++++++++ .../connectors/bankingcircle/client/client.go | 71 ++++++++ .../{client.go => client/payments.go} | 152 +----------------- .../app/connectors/bankingcircle/config.go | 15 +- .../app/connectors/bankingcircle/connector.go | 66 ++++++++ .../app/connectors/bankingcircle/loader.go | 49 +++--- .../bankingcircle/task_fetch_payments.go | 87 ++++++---- .../app/connectors/bankingcircle/task_main.go | 38 +++++ .../connectors/bankingcircle/task_resolve.go | 20 +-- .../connectors/currencycloud/client/auth.go | 5 +- .../connectors/currencycloud/client/client.go | 5 +- .../app/connectors/currencycloud/config.go | 56 +------ .../app/connectors/currencycloud/connector.go | 23 ++- .../app/connectors/currencycloud/loader.go | 6 + .../currencycloud/task_fetch_transactions.go | 12 +- .../app/connectors/currencycloud/task_main.go | 38 +++++ .../connectors/currencycloud/task_resolve.go | 29 ++-- .../app/connectors/dummypay/connector.go | 12 +- .../app/connectors/dummypay/connector_test.go | 2 +- .../connectors/dummypay/task_read_files.go | 5 +- .../mangopay/client/transactions.go | 25 +-- .../app/connectors/mangopay/config.go | 9 +- .../app/connectors/mangopay/connector.go | 14 +- .../app/connectors/mangopay/loader.go | 6 + .../mangopay/task_fetch_transactions.go | 78 +++++---- .../connectors/mangopay/task_fetch_users.go | 8 +- .../app/connectors/mangopay/task_main.go | 38 +++++ .../app/connectors/mangopay/task_resolve.go | 11 +- .../internal/app/connectors/modulr/config.go | 9 +- .../app/connectors/modulr/connector.go | 14 +- .../internal/app/connectors/modulr/loader.go | 6 + .../connectors/modulr/task_fetch_accounts.go | 8 +- .../app/connectors/modulr/task_main.go | 38 +++++ .../app/connectors/modulr/task_resolve.go | 3 + .../connectors/moneycorp/client/accounts.go | 4 + .../app/connectors/moneycorp/client/auth.go | 2 +- .../app/connectors/moneycorp/client/client.go | 4 - .../moneycorp/client/transactions.go | 29 +--- .../app/connectors/moneycorp/config.go | 9 +- .../app/connectors/moneycorp/connector.go | 14 +- .../app/connectors/moneycorp/loader.go | 6 + .../moneycorp/task_fetch_accounts.go | 8 +- .../moneycorp/task_fetch_transactions.go | 98 ++++++----- .../app/connectors/moneycorp/task_main.go | 38 +++++ .../app/connectors/moneycorp/task_resolve.go | 3 + .../app/connectors/stripe/connector.go | 12 +- .../app/connectors/stripe/task_main.go | 5 +- .../app/connectors/wise/client/client.go | 42 +++++ .../app/connectors/wise/client/profiles.go | 35 ++++ .../app/connectors/wise/client/quotes.go | 46 ++++++ .../wise/{client.go => client/transfers.go} | 122 ++------------ .../internal/app/connectors/wise/config.go | 5 +- .../internal/app/connectors/wise/connector.go | 15 +- .../internal/app/connectors/wise/loader.go | 6 + .../connectors/wise/task_fetch_profiles.go | 12 +- .../connectors/wise/task_fetch_transfers.go | 15 +- .../internal/app/connectors/wise/task_main.go | 38 +++++ .../app/connectors/wise/task_resolve.go | 7 +- .../app/connectors/wise/task_transfer.go | 10 +- .../internal/app/integration/manager_test.go | 5 +- .../payments/internal/app/models/task.go | 33 ++-- .../internal/app/storage/migrations.go | 12 ++ .../payments/internal/app/storage/task.go | 20 ++- .../payments/internal/app/task/scheduler.go | 111 +++++++++---- .../internal/app/task/scheduler_test.go | 40 ++++- .../payments/internal/app/task/store.go | 2 +- .../payments/internal/app/task/storememory.go | 9 +- 76 files changed, 1376 insertions(+), 664 deletions(-) create mode 100644 components/operator/internal/controllers/stack/testdata/monopod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml create mode 100644 components/operator/internal/controllers/stack/testdata/monopod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml create mode 100644 components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml create mode 100644 components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml create mode 100644 components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml create mode 100644 components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml create mode 100644 components/operator/internal/controllers/stack/testdata/multipod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml create mode 100644 components/operator/internal/controllers/stack/testdata/multipod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml create mode 100644 components/payments/internal/app/connectors/bankingcircle/client/auth.go create mode 100644 components/payments/internal/app/connectors/bankingcircle/client/client.go rename components/payments/internal/app/connectors/bankingcircle/{client.go => client/payments.go} (54%) create mode 100644 components/payments/internal/app/connectors/bankingcircle/connector.go create mode 100644 components/payments/internal/app/connectors/bankingcircle/task_main.go create mode 100644 components/payments/internal/app/connectors/currencycloud/task_main.go create mode 100644 components/payments/internal/app/connectors/mangopay/task_main.go create mode 100644 components/payments/internal/app/connectors/modulr/task_main.go create mode 100644 components/payments/internal/app/connectors/moneycorp/task_main.go create mode 100644 components/payments/internal/app/connectors/wise/client/client.go create mode 100644 components/payments/internal/app/connectors/wise/client/profiles.go create mode 100644 components/payments/internal/app/connectors/wise/client/quotes.go rename components/payments/internal/app/connectors/wise/{client.go => client/transfers.go} (50%) create mode 100644 components/payments/internal/app/connectors/wise/task_main.go diff --git a/components/operator/internal/controllers/stack/testdata/monopod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml b/components/operator/internal/controllers/stack/testdata/monopod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml new file mode 100644 index 000000000..c00fddfcf --- /dev/null +++ b/components/operator/internal/controllers/stack/testdata/monopod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml @@ -0,0 +1,18 @@ +apiVersion: stack.formance.com/v1beta3 +kind: Migration +metadata: + annotations: + reloader.stakater.com/auto: "true" + generation: 1 + labels: + stack: "true" + name: payments-v0.7.0-post-upgrade + namespace: monopod-latest +spec: + configuration: monopod-latest + module: payments + postUpgrade: true + targetedVersion: v0.7.0 + version: monopod-latest +status: + terminated: true diff --git a/components/operator/internal/controllers/stack/testdata/monopod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml b/components/operator/internal/controllers/stack/testdata/monopod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml new file mode 100644 index 000000000..4f90e0a51 --- /dev/null +++ b/components/operator/internal/controllers/stack/testdata/monopod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml @@ -0,0 +1,18 @@ +apiVersion: stack.formance.com/v1beta3 +kind: Migration +metadata: + annotations: + reloader.stakater.com/auto: "true" + generation: 1 + labels: + stack: "true" + name: payments-v0.7.0-pre-upgrade + namespace: monopod-latest +spec: + configuration: monopod-latest + module: payments + postUpgrade: false + targetedVersion: v0.7.0 + version: monopod-latest +status: + terminated: true diff --git a/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml b/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml new file mode 100644 index 000000000..73249957a --- /dev/null +++ b/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml @@ -0,0 +1,18 @@ +apiVersion: stack.formance.com/v1beta3 +kind: Migration +metadata: + annotations: + reloader.stakater.com/auto: "true" + generation: 1 + labels: + stack: "true" + name: payments-v0.7.0-post-upgrade + namespace: monopod-ledgerv1 +spec: + configuration: monopod-ledgerv1 + module: payments + postUpgrade: true + targetedVersion: v0.7.0 + version: monopod-ledgerv1 +status: + terminated: true diff --git a/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml b/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml new file mode 100644 index 000000000..a4ccfb11d --- /dev/null +++ b/components/operator/internal/controllers/stack/testdata/monopod-ledgerv1/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml @@ -0,0 +1,18 @@ +apiVersion: stack.formance.com/v1beta3 +kind: Migration +metadata: + annotations: + reloader.stakater.com/auto: "true" + generation: 1 + labels: + stack: "true" + name: payments-v0.7.0-pre-upgrade + namespace: monopod-ledgerv1 +spec: + configuration: monopod-ledgerv1 + module: payments + postUpgrade: false + targetedVersion: v0.7.0 + version: monopod-ledgerv1 +status: + terminated: true diff --git a/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml b/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml new file mode 100644 index 000000000..5fdb66cdc --- /dev/null +++ b/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml @@ -0,0 +1,18 @@ +apiVersion: stack.formance.com/v1beta3 +kind: Migration +metadata: + annotations: + reloader.stakater.com/auto: "true" + generation: 1 + labels: + stack: "true" + name: payments-v0.7.0-post-upgrade + namespace: monopod-search-before-v0-7-0 +spec: + configuration: monopod-search-before-v0-7-0 + module: payments + postUpgrade: true + targetedVersion: v0.7.0 + version: monopod-search-before-v0-7-0 +status: + terminated: true diff --git a/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml b/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml new file mode 100644 index 000000000..15b740a7b --- /dev/null +++ b/components/operator/internal/controllers/stack/testdata/monopod-search-before-v0.7.0/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml @@ -0,0 +1,18 @@ +apiVersion: stack.formance.com/v1beta3 +kind: Migration +metadata: + annotations: + reloader.stakater.com/auto: "true" + generation: 1 + labels: + stack: "true" + name: payments-v0.7.0-pre-upgrade + namespace: monopod-search-before-v0-7-0 +spec: + configuration: monopod-search-before-v0-7-0 + module: payments + postUpgrade: false + targetedVersion: v0.7.0 + version: monopod-search-before-v0-7-0 +status: + terminated: true diff --git a/components/operator/internal/controllers/stack/testdata/multipod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml b/components/operator/internal/controllers/stack/testdata/multipod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml new file mode 100644 index 000000000..a215460f7 --- /dev/null +++ b/components/operator/internal/controllers/stack/testdata/multipod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-post-upgrade.yaml @@ -0,0 +1,18 @@ +apiVersion: stack.formance.com/v1beta3 +kind: Migration +metadata: + annotations: + reloader.stakater.com/auto: "true" + generation: 1 + labels: + stack: "true" + name: payments-v0.7.0-post-upgrade + namespace: multipod-latest +spec: + configuration: multipod-latest + module: payments + postUpgrade: true + targetedVersion: v0.7.0 + version: multipod-latest +status: + terminated: true diff --git a/components/operator/internal/controllers/stack/testdata/multipod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml b/components/operator/internal/controllers/stack/testdata/multipod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml new file mode 100644 index 000000000..1a588821d --- /dev/null +++ b/components/operator/internal/controllers/stack/testdata/multipod-latest/results/migrations-stack.formance.com-v1beta3/payments-v0.7.0-pre-upgrade.yaml @@ -0,0 +1,18 @@ +apiVersion: stack.formance.com/v1beta3 +kind: Migration +metadata: + annotations: + reloader.stakater.com/auto: "true" + generation: 1 + labels: + stack: "true" + name: payments-v0.7.0-pre-upgrade + namespace: multipod-latest +spec: + configuration: multipod-latest + module: payments + postUpgrade: false + targetedVersion: v0.7.0 + version: multipod-latest +status: + terminated: true diff --git a/components/operator/internal/handlers/handler_payments.go b/components/operator/internal/handlers/handler_payments.go index a4a394697..bb5b0da39 100644 --- a/components/operator/internal/handlers/handler_payments.go +++ b/components/operator/internal/handlers/handler_payments.go @@ -10,6 +10,7 @@ import ( "github.com/formancehq/payments/cmd" "github.com/uptrace/bun" "github.com/uptrace/bun/dialect/pgdialect" + "sigs.k8s.io/controller-runtime/pkg/log" ) func init() { @@ -62,24 +63,12 @@ func init() { return paymentsPreUpgradeMigration(ctx) }, PostUpgrade: func(ctx modules.PostInstallContext) error { - if err := resetConnectors(ctx, "stripe"); err != nil { - return err - } - if err := resetConnectors(ctx, "wise"); err != nil { - return err - } - if err := resetConnectors(ctx, "modulr"); err != nil { - return err - } - if err := resetConnectors(ctx, "banking-circle"); err != nil { - return err - } - if err := resetConnectors(ctx, "currency-cloud"); err != nil { - return err - } - if err := resetConnectors(ctx, "dummy-pay"); err != nil { - return err - } + resetConnectors(ctx, "stripe") + resetConnectors(ctx, "wise") + resetConnectors(ctx, "modulr") + resetConnectors(ctx, "banking-circle") + resetConnectors(ctx, "currency-cloud") + resetConnectors(ctx, "dummy-pay") return nil }, Services: func(ctx modules.ModuleContext) modules.Services { @@ -102,6 +91,25 @@ func init() { return paymentsServices(ctx, env) }, }, + "v0.7.0": { + PreUpgrade: func(ctx modules.Context) error { + return paymentsPreUpgradeMigration(ctx) + }, + PostUpgrade: func(ctx modules.PostInstallContext) error { + resetConnectors(ctx, "stripe") + resetConnectors(ctx, "wise") + resetConnectors(ctx, "modulr") + resetConnectors(ctx, "banking-circle") + resetConnectors(ctx, "currency-cloud") + resetConnectors(ctx, "dummy-pay") + resetConnectors(ctx, "mangopay") + resetConnectors(ctx, "moneycorp") + return nil + }, + Services: func(ctx modules.ModuleContext) modules.Services { + return paymentsServices(ctx, env) + }, + }, }, }) } @@ -148,7 +156,7 @@ func paymentsServices( }} } -func resetConnectors(ctx modules.PostInstallContext, connector string) error { +func resetConnectors(ctx modules.PostInstallContext, connector string) { endpoint := fmt.Sprintf( "http://payments.%s.svc:%d/connectors/%s/reset", ctx.Stack.Name, @@ -156,5 +164,9 @@ func resetConnectors(ctx modules.PostInstallContext, connector string) error { connector, ) _, err := http.Post(endpoint, "", nil) - return err + if err != nil { + logger := log.FromContext(ctx) + logger.WithValues("endpoint", endpoint).Error(err, "failed to reset connector") + // Do not return any error here, as the connector is not required to be installed + } } diff --git a/components/payments/internal/app/connectors/bankingcircle/client/auth.go b/components/payments/internal/app/connectors/bankingcircle/client/auth.go new file mode 100644 index 000000000..c7f554013 --- /dev/null +++ b/components/payments/internal/app/connectors/bankingcircle/client/auth.go @@ -0,0 +1,69 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "time" +) + +func (c *Client) login(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, + c.authorizationEndpoint+"/api/v1/authorizations/authorize", http.NoBody) + if err != nil { + return fmt.Errorf("failed to create login request: %w", err) + } + + req.SetBasicAuth(c.username, c.password) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to login: %w", err) + } + + defer func() { + err = resp.Body.Close() + if err != nil { + c.logger.Error(err) + } + }() + + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read login response body: %w", err) + } + + //nolint:tagliatelle // allow for client-side structures + type response struct { + AccessToken string `json:"access_token"` + ExpiresIn string `json:"expires_in"` + } + + var res response + + if err = json.Unmarshal(responseBody, &res); err != nil { + return fmt.Errorf("failed to unmarshal login response: %w", err) + } + + c.accessToken = res.AccessToken + + expiresIn, err := strconv.Atoi(res.ExpiresIn) + if err != nil { + return fmt.Errorf("failed to convert expires_in to int: %w", err) + } + + c.accessTokenExpiresAt = time.Now().Add(time.Duration(expiresIn) * time.Second) + + return nil +} + +func (c *Client) ensureAccessTokenIsValid(ctx context.Context) error { + if c.accessTokenExpiresAt.After(time.Now().Add(5 * time.Second)) { + return nil + } + + return c.login(ctx) +} diff --git a/components/payments/internal/app/connectors/bankingcircle/client/client.go b/components/payments/internal/app/connectors/bankingcircle/client/client.go new file mode 100644 index 000000000..f65dbe32c --- /dev/null +++ b/components/payments/internal/app/connectors/bankingcircle/client/client.go @@ -0,0 +1,71 @@ +package client + +import ( + "context" + "crypto/tls" + "net/http" + "time" + + "github.com/formancehq/stack/libs/go-libs/logging" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +type Client struct { + httpClient *http.Client + + username string + password string + + endpoint string + authorizationEndpoint string + + logger logging.Logger + + accessToken string + accessTokenExpiresAt time.Time +} + +func newHTTPClient(userCertificate, userCertificateKey string) (*http.Client, error) { + cert, err := tls.X509KeyPair([]byte(userCertificate), []byte(userCertificateKey)) + if err != nil { + return nil, err + } + + tr := http.DefaultTransport.(*http.Transport).Clone() + tr.TLSClientConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + + return &http.Client{ + Timeout: 10 * time.Second, + Transport: otelhttp.NewTransport(tr), + }, nil +} + +func NewClient( + username, password, + endpoint, authorizationEndpoint, + uCertificate, uCertificateKey string, + logger logging.Logger) (*Client, error) { + httpClient, err := newHTTPClient(uCertificate, uCertificateKey) + if err != nil { + return nil, err + } + + c := &Client{ + httpClient: httpClient, + + username: username, + password: password, + endpoint: endpoint, + authorizationEndpoint: authorizationEndpoint, + + logger: logger, + } + + if err := c.login(context.TODO()); err != nil { + return nil, err + } + + return c, nil +} diff --git a/components/payments/internal/app/connectors/bankingcircle/client.go b/components/payments/internal/app/connectors/bankingcircle/client/payments.go similarity index 54% rename from components/payments/internal/app/connectors/bankingcircle/client.go rename to components/payments/internal/app/connectors/bankingcircle/client/payments.go index 9939ccd3b..e9ffc1d50 100644 --- a/components/payments/internal/app/connectors/bankingcircle/client.go +++ b/components/payments/internal/app/connectors/bankingcircle/client/payments.go @@ -1,139 +1,16 @@ -package bankingcircle +package client import ( "context" - "crypto/tls" "encoding/json" "fmt" "io" "net/http" - "strconv" "time" - - "github.com/formancehq/stack/libs/go-libs/logging" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) -type client struct { - httpClient *http.Client - - username string - password string - - endpoint string - authorizationEndpoint string - - logger logging.Logger - - accessToken string - accessTokenExpiresAt time.Time -} - -func newHTTPClient(userCertificate, userCertificateKey string) (*http.Client, error) { - cert, err := tls.X509KeyPair([]byte(userCertificate), []byte(userCertificateKey)) - if err != nil { - return nil, err - } - - tr := http.DefaultTransport.(*http.Transport).Clone() - tr.TLSClientConfig = &tls.Config{ - Certificates: []tls.Certificate{cert}, - } - - return &http.Client{ - Timeout: 10 * time.Second, - Transport: otelhttp.NewTransport(tr), - }, nil -} - -func newClient( - username, password, - endpoint, authorizationEndpoint, - uCertificate, uCertificateKey string, - logger logging.Logger) (*client, error) { - httpClient, err := newHTTPClient(uCertificate, uCertificateKey) - if err != nil { - return nil, err - } - - c := &client{ - httpClient: httpClient, - - username: username, - password: password, - endpoint: endpoint, - authorizationEndpoint: authorizationEndpoint, - - logger: logger, - } - - if err := c.login(context.TODO()); err != nil { - return nil, err - } - - return c, nil -} - -func (c *client) login(ctx context.Context) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, - c.authorizationEndpoint+"/api/v1/authorizations/authorize", http.NoBody) - if err != nil { - return fmt.Errorf("failed to create login request: %w", err) - } - - req.SetBasicAuth(c.username, c.password) - - resp, err := c.httpClient.Do(req) - if err != nil { - return fmt.Errorf("failed to login: %w", err) - } - - defer func() { - err = resp.Body.Close() - if err != nil { - c.logger.Error(err) - } - }() - - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("failed to read login response body: %w", err) - } - - //nolint:tagliatelle // allow for client-side structures - type response struct { - AccessToken string `json:"access_token"` - ExpiresIn string `json:"expires_in"` - } - - var res response - - if err = json.Unmarshal(responseBody, &res); err != nil { - return fmt.Errorf("failed to unmarshal login response: %w", err) - } - - c.accessToken = res.AccessToken - - expiresIn, err := strconv.Atoi(res.ExpiresIn) - if err != nil { - return fmt.Errorf("failed to convert expires_in to int: %w", err) - } - - c.accessTokenExpiresAt = time.Now().Add(time.Duration(expiresIn) * time.Second) - - return nil -} - -func (c *client) ensureAccessTokenIsValid(ctx context.Context) error { - if c.accessTokenExpiresAt.After(time.Now()) { - return nil - } - - return c.login(ctx) -} - //nolint:tagliatelle // allow for client-side structures -type payment struct { +type Payment struct { PaymentID string `json:"paymentId"` TransactionReference string `json:"transactionReference"` ConcurrencyToken string `json:"concurrencyToken"` @@ -201,26 +78,7 @@ type payment struct { } `json:"creditorInformation"` } -func (c *client) getAllPayments(ctx context.Context) ([]*payment, error) { - var payments []*payment - - for page := 1; ; page++ { - pagedPayments, err := c.getPayments(ctx, page) - if err != nil { - return nil, err - } - - if len(pagedPayments) == 0 { - break - } - - payments = append(payments, pagedPayments...) - } - - return payments, nil -} - -func (c *client) getPayments(ctx context.Context, page int) ([]*payment, error) { +func (c *Client) GetPayments(ctx context.Context, page int) ([]*Payment, error) { if err := c.ensureAccessTokenIsValid(ctx); err != nil { return nil, err } @@ -231,7 +89,7 @@ func (c *client) getPayments(ctx context.Context, page int) ([]*payment, error) } q := req.URL.Query() - q.Add("PageSize", "5000") + q.Add("PageSize", "100") q.Add("PageNumber", fmt.Sprint(page)) req.URL.RawQuery = q.Encode() @@ -256,7 +114,7 @@ func (c *client) getPayments(ctx context.Context, page int) ([]*payment, error) } type response struct { - Result []*payment `json:"result"` + Result []*Payment `json:"result"` PageInfo struct { CurrentPage int `json:"currentPage"` PageSize int `json:"pageSize"` diff --git a/components/payments/internal/app/connectors/bankingcircle/config.go b/components/payments/internal/app/connectors/bankingcircle/config.go index e645ad080..2a7387109 100644 --- a/components/payments/internal/app/connectors/bankingcircle/config.go +++ b/components/payments/internal/app/connectors/bankingcircle/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" + "github.com/formancehq/payments/internal/app/connectors" "github.com/formancehq/payments/internal/app/connectors/configtemplate" ) @@ -16,12 +17,13 @@ import ( // openssl pkcs12 -in PC20230412293693.pfx -clcerts -nokeys | sed -ne '/-BEGIN CERTIFICATE-/,/-END CERTIFICATE-/p' > clientcert.cer // openssl pkcs12 -in PC20230412293693.pfx -nocerts -nodes | sed -ne '/-BEGIN PRIVATE KEY-/,/-END PRIVATE KEY-/p' > clientcert.key type Config struct { - Username string `json:"username" yaml:"username" bson:"username"` - Password string `json:"password" yaml:"password" bson:"password"` - Endpoint string `json:"endpoint" yaml:"endpoint" bson:"endpoint"` - AuthorizationEndpoint string `json:"authorizationEndpoint" yaml:"authorizationEndpoint" bson:"authorizationEndpoint"` - UserCertificate string `json:"userCertificate" yaml:"userCertificate" bson:"userCertificate"` - UserCertificateKey string `json:"userCertificateKey" yaml:"userCertificateKey" bson:"userCertificateKey"` + Username string `json:"username" yaml:"username" bson:"username"` + Password string `json:"password" yaml:"password" bson:"password"` + Endpoint string `json:"endpoint" yaml:"endpoint" bson:"endpoint"` + AuthorizationEndpoint string `json:"authorizationEndpoint" yaml:"authorizationEndpoint" bson:"authorizationEndpoint"` + UserCertificate string `json:"userCertificate" yaml:"userCertificate" bson:"userCertificate"` + UserCertificateKey string `json:"userCertificateKey" yaml:"userCertificateKey" bson:"userCertificateKey"` + PollingPeriod connectors.Duration `json:"pollingPeriod" yaml:"pollingPeriod" bson:"pollingPeriod"` } // String obfuscates sensitive fields and returns a string representation of the config. @@ -71,6 +73,7 @@ func (c Config) BuildTemplate() (string, configtemplate.Config) { cfg.AddParameter("authorizationEndpoint", configtemplate.TypeString, true) cfg.AddParameter("userCertificate", configtemplate.TypeLongString, true) cfg.AddParameter("userCertificateKey", configtemplate.TypeLongString, true) + cfg.AddParameter("pollingPeriod", configtemplate.TypeDurationNs, false) return Name.String(), cfg } diff --git a/components/payments/internal/app/connectors/bankingcircle/connector.go b/components/payments/internal/app/connectors/bankingcircle/connector.go new file mode 100644 index 000000000..fa6125d27 --- /dev/null +++ b/components/payments/internal/app/connectors/bankingcircle/connector.go @@ -0,0 +1,66 @@ +package bankingcircle + +import ( + "context" + + "github.com/formancehq/payments/internal/app/integration" + "github.com/formancehq/payments/internal/app/models" + "github.com/formancehq/payments/internal/app/task" + "github.com/formancehq/stack/libs/go-libs/logging" +) + +const Name = models.ConnectorProviderBankingCircle + +type Connector struct { + logger logging.Logger + cfg Config +} + +func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error { + // TODO implement me + panic("implement me") +} + +func (c *Connector) Install(ctx task.ConnectorContext) error { + taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Main task to periodically fetch payments", + Key: taskNameMain, + }) + if err != nil { + return err + } + + return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, models.TaskSchedulerOptions{ + // We want to polling every c.cfg.PollingPeriod.Duration seconds the users + // and their transactions. + ScheduleOption: models.OPTIONS_RUN_INDEFINITELY, + Duration: c.cfg.PollingPeriod.Duration, + // No need to restart this task, since the connector is not existing or + // was uninstalled previously, the task does not exists in the database + Restart: false, + }) +} + +func (c *Connector) Uninstall(ctx context.Context) error { + return nil +} + +func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task { + taskDescriptor, err := models.DecodeTaskDescriptor[TaskDescriptor](descriptor) + if err != nil { + panic(err) + } + + return resolveTasks(c.logger, c.cfg)(taskDescriptor) +} + +var _ integration.Connector = &Connector{} + +func newConnector(logger logging.Logger, cfg Config) *Connector { + return &Connector{ + logger: logger.WithFields(map[string]any{ + "component": "connector", + }), + cfg: cfg, + } +} diff --git a/components/payments/internal/app/connectors/bankingcircle/loader.go b/components/payments/internal/app/connectors/bankingcircle/loader.go index c58a6c8f2..41b355aae 100644 --- a/components/payments/internal/app/connectors/bankingcircle/loader.go +++ b/components/payments/internal/app/connectors/bankingcircle/loader.go @@ -1,33 +1,38 @@ package bankingcircle import ( + "time" + "github.com/formancehq/payments/internal/app/integration" "github.com/formancehq/payments/internal/app/models" - "github.com/formancehq/payments/internal/app/task" "github.com/formancehq/stack/libs/go-libs/logging" ) -const Name = models.ConnectorProviderBankingCircle +type Loader struct{} + +const allowedTasks = 50 + +func (l *Loader) AllowTasks() int { + return allowedTasks +} + +func (l *Loader) Name() models.ConnectorProvider { + return Name +} + +func (l *Loader) Load(logger logging.Logger, config Config) integration.Connector { + return newConnector(logger, config) +} + +func (l *Loader) ApplyDefaults(cfg Config) Config { + if cfg.PollingPeriod.Duration == 0 { + cfg.PollingPeriod.Duration = 2 * time.Minute + } + + return cfg +} // NewLoader creates a new loader. -func NewLoader() integration.Loader[Config] { - loader := integration.NewLoaderBuilder[Config](Name). - WithLoad(func(logger logging.Logger, config Config) integration.Connector { - return integration.NewConnectorBuilder(). - WithInstall(func(ctx task.ConnectorContext) error { - taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ - Name: "Fetch payments from source", - Key: taskNameFetchPayments, - }) - if err != nil { - return err - } - - return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, false) - }). - WithResolve(resolveTasks(logger, config)). - Build() - }).Build() - - return loader +func NewLoader() *Loader { + return &Loader{} } diff --git a/components/payments/internal/app/connectors/bankingcircle/task_fetch_payments.go b/components/payments/internal/app/connectors/bankingcircle/task_fetch_payments.go index 0d7ad7557..944c792d9 100644 --- a/components/payments/internal/app/connectors/bankingcircle/task_fetch_payments.go +++ b/components/payments/internal/app/connectors/bankingcircle/task_fetch_payments.go @@ -4,61 +4,80 @@ import ( "context" "encoding/json" - "github.com/formancehq/payments/internal/app/models" - + "github.com/formancehq/payments/internal/app/connectors/bankingcircle/client" "github.com/formancehq/payments/internal/app/ingestion" + "github.com/formancehq/payments/internal/app/models" "github.com/formancehq/payments/internal/app/task" - "github.com/formancehq/stack/libs/go-libs/logging" ) -func taskFetchPayments(logger logging.Logger, client *client) task.Task { +func taskFetchPayments(logger logging.Logger, client *client.Client) task.Task { return func( ctx context.Context, scheduler task.Scheduler, ingester ingestion.Ingester, ) error { - paymentsList, err := client.getAllPayments(ctx) - if err != nil { - return err - } - - batch := ingestion.PaymentBatch{} + for page := 1; ; page++ { + pagedPayments, err := client.GetPayments(ctx, page) + if err != nil { + return err + } - for _, paymentEl := range paymentsList { - logger.Info(paymentEl) + if len(pagedPayments) == 0 { + break + } - raw, err := json.Marshal(paymentEl) - if err != nil { + if err := ingestBatch(ctx, ingester, pagedPayments); err != nil { return err } + } - paymentType := matchPaymentType(paymentEl.Classification) + return nil + } +} + +func ingestBatch( + ctx context.Context, + ingester ingestion.Ingester, + payments []*client.Payment, +) error { + batch := ingestion.PaymentBatch{} + + for _, paymentEl := range payments { + raw, err := json.Marshal(paymentEl) + if err != nil { + return err + } - batchElement := ingestion.PaymentBatchElement{ - Payment: &models.Payment{ - ID: models.PaymentID{ - PaymentReference: models.PaymentReference{ - Reference: paymentEl.TransactionReference, - Type: paymentType, - }, - Provider: models.ConnectorProviderBankingCircle, + paymentType := matchPaymentType(paymentEl.Classification) + + batchElement := ingestion.PaymentBatchElement{ + Payment: &models.Payment{ + ID: models.PaymentID{ + PaymentReference: models.PaymentReference{ + Reference: paymentEl.TransactionReference, + Type: paymentType, }, - Reference: paymentEl.TransactionReference, - Type: paymentType, - Status: matchPaymentStatus(paymentEl.Status), - Scheme: models.PaymentSchemeOther, - Amount: int64(paymentEl.Transfer.Amount.Amount * 100), - Asset: models.PaymentAsset(paymentEl.Transfer.Amount.Currency + "/2"), - RawData: raw, + Provider: models.ConnectorProviderBankingCircle, }, - } - - batch = append(batch, batchElement) + Reference: paymentEl.TransactionReference, + Type: paymentType, + Status: matchPaymentStatus(paymentEl.Status), + Scheme: models.PaymentSchemeOther, + Amount: int64(paymentEl.Transfer.Amount.Amount * 100), + Asset: models.PaymentAsset(paymentEl.Transfer.Amount.Currency + "/2"), + RawData: raw, + }, } - return ingester.IngestPayments(ctx, batch, struct{}{}) + batch = append(batch, batchElement) } + + if err := ingester.IngestPayments(ctx, batch, struct{}{}); err != nil { + return err + } + + return nil } func matchPaymentStatus(paymentStatus string) models.PaymentStatus { diff --git a/components/payments/internal/app/connectors/bankingcircle/task_main.go b/components/payments/internal/app/connectors/bankingcircle/task_main.go new file mode 100644 index 000000000..0c92c270c --- /dev/null +++ b/components/payments/internal/app/connectors/bankingcircle/task_main.go @@ -0,0 +1,38 @@ +package bankingcircle + +import ( + "context" + "errors" + + "github.com/formancehq/payments/internal/app/models" + "github.com/formancehq/payments/internal/app/task" + "github.com/formancehq/stack/libs/go-libs/logging" +) + +// taskMain is the main task of the connector. It launches the other tasks. +func taskMain(logger logging.Logger) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler, + ) error { + logger.Info(taskNameMain) + + taskUsers, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Fetch payments from client", + Key: taskNameFetchPayments, + }) + if err != nil { + return err + } + + err = scheduler.Schedule(ctx, taskUsers, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { + return err + } + + return nil + } +} diff --git a/components/payments/internal/app/connectors/bankingcircle/task_resolve.go b/components/payments/internal/app/connectors/bankingcircle/task_resolve.go index 925763578..f4d9ddf33 100644 --- a/components/payments/internal/app/connectors/bankingcircle/task_resolve.go +++ b/components/payments/internal/app/connectors/bankingcircle/task_resolve.go @@ -3,14 +3,13 @@ package bankingcircle import ( "fmt" - "github.com/formancehq/payments/internal/app/models" - + "github.com/formancehq/payments/internal/app/connectors/bankingcircle/client" "github.com/formancehq/payments/internal/app/task" - "github.com/formancehq/stack/libs/go-libs/logging" ) const ( + taskNameMain = "main" taskNameFetchPayments = "fetch-payments" ) @@ -20,8 +19,8 @@ type TaskDescriptor struct { Key string `json:"key" yaml:"key" bson:"key"` } -func resolveTasks(logger logging.Logger, config Config) func(taskDefinition models.TaskDescriptor) task.Task { - bankingCircleClient, err := newClient( +func resolveTasks(logger logging.Logger, config Config) func(taskDefinition TaskDescriptor) task.Task { + bankingCircleClient, err := client.NewClient( config.Username, config.Password, config.Endpoint, @@ -36,15 +35,10 @@ func resolveTasks(logger logging.Logger, config Config) func(taskDefinition mode return nil } - return func(taskDefinition models.TaskDescriptor) task.Task { - taskDescriptor, err := models.DecodeTaskDescriptor[TaskDescriptor](taskDefinition) - if err != nil { - logger.Error(err) - - return nil - } - + return func(taskDescriptor TaskDescriptor) task.Task { switch taskDescriptor.Key { + case taskNameMain: + return taskMain(logger) case taskNameFetchPayments: return taskFetchPayments(logger, bankingCircleClient) } diff --git a/components/payments/internal/app/connectors/currencycloud/client/auth.go b/components/payments/internal/app/connectors/currencycloud/client/auth.go index a045687cd..a5fda39a3 100644 --- a/components/payments/internal/app/connectors/currencycloud/client/auth.go +++ b/components/payments/internal/app/connectors/currencycloud/client/auth.go @@ -1,7 +1,6 @@ package client import ( - "context" "encoding/json" "fmt" "net/http" @@ -9,13 +8,13 @@ import ( "strings" ) -func (c *Client) authenticate(ctx context.Context) (string, error) { +func (c *Client) authenticate() (string, error) { form := make(url.Values) form.Add("login_id", c.loginID) form.Add("api_key", c.apiKey) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, + req, err := http.NewRequest(http.MethodPost, c.buildEndpoint("v2/authenticate/api"), strings.NewReader(form.Encode())) if err != nil { return "", fmt.Errorf("failed to create request: %w", err) diff --git a/components/payments/internal/app/connectors/currencycloud/client/client.go b/components/payments/internal/app/connectors/currencycloud/client/client.go index d402d85e7..dec82bcca 100644 --- a/components/payments/internal/app/connectors/currencycloud/client/client.go +++ b/components/payments/internal/app/connectors/currencycloud/client/client.go @@ -1,7 +1,6 @@ package client import ( - "context" "fmt" "net/http" @@ -48,7 +47,7 @@ func newHTTPClient() *http.Client { } // NewClient creates a new client for the CurrencyCloud API. -func NewClient(ctx context.Context, loginID, apiKey, endpoint string) (*Client, error) { +func NewClient(loginID, apiKey, endpoint string) (*Client, error) { if endpoint == "" { endpoint = devAPIEndpoint } @@ -60,7 +59,7 @@ func NewClient(ctx context.Context, loginID, apiKey, endpoint string) (*Client, apiKey: apiKey, } - authToken, err := c.authenticate(ctx) + authToken, err := c.authenticate() if err != nil { return nil, err } diff --git a/components/payments/internal/app/connectors/currencycloud/config.go b/components/payments/internal/app/connectors/currencycloud/config.go index 3015566c5..2f71d7f6d 100644 --- a/components/payments/internal/app/connectors/currencycloud/config.go +++ b/components/payments/internal/app/connectors/currencycloud/config.go @@ -3,16 +3,16 @@ package currencycloud import ( "encoding/json" "fmt" - "time" + "github.com/formancehq/payments/internal/app/connectors" "github.com/formancehq/payments/internal/app/connectors/configtemplate" ) type Config struct { - LoginID string `json:"loginID" bson:"loginID"` - APIKey string `json:"apiKey" bson:"apiKey"` - Endpoint string `json:"endpoint" bson:"endpoint"` - PollingPeriod Duration `json:"pollingPeriod" bson:"pollingPeriod"` + LoginID string `json:"loginID" bson:"loginID"` + APIKey string `json:"apiKey" bson:"apiKey"` + Endpoint string `json:"endpoint" bson:"endpoint"` + PollingPeriod connectors.Duration `json:"pollingPeriod" bson:"pollingPeriod"` } // String obfuscates sensitive fields and returns a string representation of the config. @@ -30,10 +30,6 @@ func (c Config) Validate() error { return ErrMissingLoginID } - if c.PollingPeriod == 0 { - return ErrMissingPollingPeriod - } - return nil } @@ -41,53 +37,13 @@ func (c Config) Marshal() ([]byte, error) { return json.Marshal(c) } -type Duration time.Duration - -func (d *Duration) String() string { - return time.Duration(*d).String() -} - -func (d *Duration) Duration() time.Duration { - return time.Duration(*d) -} - -func (d *Duration) MarshalJSON() ([]byte, error) { - return json.Marshal(time.Duration(*d).String()) -} - -func (d *Duration) UnmarshalJSON(b []byte) error { - var durationValue interface{} - - if err := json.Unmarshal(b, &durationValue); err != nil { - return err - } - - switch value := durationValue.(type) { - case float64: - *d = Duration(time.Duration(value)) - - return nil - case string: - tmp, err := time.ParseDuration(value) - if err != nil { - return err - } - - *d = Duration(tmp) - - return nil - default: - return ErrDurationInvalid - } -} - func (c Config) BuildTemplate() (string, configtemplate.Config) { cfg := configtemplate.NewConfig() cfg.AddParameter("loginID", configtemplate.TypeString, true) cfg.AddParameter("apiKey", configtemplate.TypeString, true) cfg.AddParameter("endpoint", configtemplate.TypeString, false) - cfg.AddParameter("pollingPeriod", configtemplate.TypeDurationNs, true) + cfg.AddParameter("pollingPeriod", configtemplate.TypeDurationNs, false) return Name.String(), cfg } diff --git a/components/payments/internal/app/connectors/currencycloud/connector.go b/components/payments/internal/app/connectors/currencycloud/connector.go index b9c56f9b1..b76264849 100644 --- a/components/payments/internal/app/connectors/currencycloud/connector.go +++ b/components/payments/internal/app/connectors/currencycloud/connector.go @@ -23,20 +23,37 @@ func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models. } func (c *Connector) Install(ctx task.ConnectorContext) error { - taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{Name: taskNameFetchTransactions}) + taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Main task to periodically fetch users and transactions", + Key: taskNameMain, + }) if err != nil { return err } - return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, true) + return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, models.TaskSchedulerOptions{ + // We want to polling every c.cfg.PollingPeriod.Duration seconds the users + // and their transactions. + ScheduleOption: models.OPTIONS_RUN_INDEFINITELY, + Duration: c.cfg.PollingPeriod.Duration, + // No need to restart this task, since the connector is not existing or + // was uninstalled previously, the task does not exists in the database + Restart: false, + }) } func (c *Connector) Uninstall(ctx context.Context) error { + return nil } func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task { - return resolveTasks(c.logger, c.cfg) + taskDescriptor, err := models.DecodeTaskDescriptor[TaskDescriptor](descriptor) + if err != nil { + panic(err) + } + + return resolveTasks(c.logger, c.cfg)(taskDescriptor) } var _ integration.Connector = &Connector{} diff --git a/components/payments/internal/app/connectors/currencycloud/loader.go b/components/payments/internal/app/connectors/currencycloud/loader.go index 514ca4d33..44c71c988 100644 --- a/components/payments/internal/app/connectors/currencycloud/loader.go +++ b/components/payments/internal/app/connectors/currencycloud/loader.go @@ -1,6 +1,8 @@ package currencycloud import ( + "time" + "github.com/formancehq/payments/internal/app/integration" "github.com/formancehq/payments/internal/app/models" "github.com/formancehq/stack/libs/go-libs/logging" @@ -23,6 +25,10 @@ func (l *Loader) Load(logger logging.Logger, config Config) integration.Connecto } func (l *Loader) ApplyDefaults(cfg Config) Config { + if cfg.PollingPeriod.Duration == 0 { + cfg.PollingPeriod.Duration = 2 * time.Minute + } + return cfg } diff --git a/components/payments/internal/app/connectors/currencycloud/task_fetch_transactions.go b/components/payments/internal/app/connectors/currencycloud/task_fetch_transactions.go index ea9ce7f3d..53e3f97ff 100644 --- a/components/payments/internal/app/connectors/currencycloud/task_fetch_transactions.go +++ b/components/payments/internal/app/connectors/currencycloud/task_fetch_transactions.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "strconv" - "time" "github.com/formancehq/payments/internal/app/models" @@ -22,16 +21,7 @@ func taskFetchTransactions(logger logging.Logger, client *client.Client, config ctx context.Context, ingester ingestion.Ingester, ) error { - for { - select { - case <-ctx.Done(): - return nil - case <-time.After(config.PollingPeriod.Duration()): - if err := ingestTransactions(ctx, logger, client, ingester); err != nil { - return err - } - } - } + return ingestTransactions(ctx, logger, client, ingester) } } diff --git a/components/payments/internal/app/connectors/currencycloud/task_main.go b/components/payments/internal/app/connectors/currencycloud/task_main.go new file mode 100644 index 000000000..29910393e --- /dev/null +++ b/components/payments/internal/app/connectors/currencycloud/task_main.go @@ -0,0 +1,38 @@ +package currencycloud + +import ( + "context" + "errors" + + "github.com/formancehq/payments/internal/app/models" + "github.com/formancehq/payments/internal/app/task" + "github.com/formancehq/stack/libs/go-libs/logging" +) + +// taskMain is the main task of the connector. It launches the other tasks. +func taskMain(logger logging.Logger) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler, + ) error { + logger.Info(taskNameMain) + + taskUsers, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Fetch transactions from client", + Key: taskNameFetchTransactions, + }) + if err != nil { + return err + } + + err = scheduler.Schedule(ctx, taskUsers, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { + return err + } + + return nil + } +} diff --git a/components/payments/internal/app/connectors/currencycloud/task_resolve.go b/components/payments/internal/app/connectors/currencycloud/task_resolve.go index c7161f15e..ae0b8e913 100644 --- a/components/payments/internal/app/connectors/currencycloud/task_resolve.go +++ b/components/payments/internal/app/connectors/currencycloud/task_resolve.go @@ -1,7 +1,6 @@ package currencycloud import ( - "context" "fmt" "github.com/formancehq/payments/internal/app/connectors/currencycloud/client" @@ -12,26 +11,36 @@ import ( ) const ( + taskNameMain = "main" taskNameFetchTransactions = "fetch-transactions" ) // TaskDescriptor is the definition of a task. type TaskDescriptor struct { Name string `json:"name" yaml:"name" bson:"name"` + Key string `json:"key" yaml:"key" bson:"key"` } -func resolveTasks(logger logging.Logger, config Config) task.Task { - return func(ctx context.Context, taskDescriptor TaskDescriptor) task.Task { - currencyCloudClient, err := client.NewClient(ctx, config.LoginID, config.APIKey, config.Endpoint) - if err != nil { - return func(ctx context.Context, taskDefinition TaskDescriptor) task.Task { - return func() error { - return fmt.Errorf("failed to initiate client: %w", err) - } +func resolveTasks(logger logging.Logger, config Config) func(taskDefinition TaskDescriptor) task.Task { + currencyCloudClient, err := client.NewClient(config.LoginID, config.APIKey, config.Endpoint) + if err != nil { + return func(taskDefinition TaskDescriptor) task.Task { + return func() error { + return fmt.Errorf("failed to initiate client: %w", err) } } + } + + return func(taskDescriptor TaskDescriptor) task.Task { + if taskDescriptor.Key == "" { + // Keep the compatibility with previous version if the connector. + // If the key is empty, use the name as the key. + taskDescriptor.Key = taskDescriptor.Name + } - switch taskDescriptor.Name { + switch taskDescriptor.Key { + case taskNameMain: + return taskMain(logger) case taskNameFetchTransactions: return taskFetchTransactions(logger, currencyCloudClient, config) } diff --git a/components/payments/internal/app/connectors/dummypay/connector.go b/components/payments/internal/app/connectors/dummypay/connector.go index 81fdbac3d..e79abbfb8 100644 --- a/components/payments/internal/app/connectors/dummypay/connector.go +++ b/components/payments/internal/app/connectors/dummypay/connector.go @@ -36,7 +36,12 @@ func (c *Connector) Install(ctx task.ConnectorContext) error { return fmt.Errorf("failed to create read files task descriptor: %w", err) } - if err = ctx.Scheduler().Schedule(ctx.Context(), readFilesDescriptor, true); err != nil { + if err = ctx.Scheduler().Schedule(ctx.Context(), readFilesDescriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + // No need to restart this task, since the connector is not existing or + // was uninstalled previously, the task does not exists in the database + Restart: false, + }); err != nil { return fmt.Errorf("failed to schedule task to read files: %w", err) } @@ -45,7 +50,10 @@ func (c *Connector) Install(ctx task.ConnectorContext) error { return fmt.Errorf("failed to create generate files task descriptor: %w", err) } - if err = ctx.Scheduler().Schedule(ctx.Context(), generateFilesDescriptor, true); err != nil { + if err = ctx.Scheduler().Schedule(ctx.Context(), generateFilesDescriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + }); err != nil { return fmt.Errorf("failed to schedule task to generate files: %w", err) } diff --git a/components/payments/internal/app/connectors/dummypay/connector_test.go b/components/payments/internal/app/connectors/dummypay/connector_test.go index a2266b78e..dbfcc2445 100644 --- a/components/payments/internal/app/connectors/dummypay/connector_test.go +++ b/components/payments/internal/app/connectors/dummypay/connector_test.go @@ -25,7 +25,7 @@ func (mcc *mockConnectorContext) Context() context.Context { return mcc.ctx } -func (mcc mockScheduler) Schedule(ctx context.Context, p models.TaskDescriptor, restart bool) error { +func (mcc mockScheduler) Schedule(ctx context.Context, p models.TaskDescriptor, options models.TaskSchedulerOptions) error { return nil } diff --git a/components/payments/internal/app/connectors/dummypay/task_read_files.go b/components/payments/internal/app/connectors/dummypay/task_read_files.go index 1ced9082f..0d073e125 100644 --- a/components/payments/internal/app/connectors/dummypay/task_read_files.go +++ b/components/payments/internal/app/connectors/dummypay/task_read_files.go @@ -46,7 +46,10 @@ func taskReadFiles(config Config, fs fs) task.Task { } // schedule a task to ingest the file into the payments system. - err = scheduler.Schedule(ctx, descriptor, true) + err = scheduler.Schedule(ctx, descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) if err != nil { return fmt.Errorf("failed to schedule task to ingest file '%s': %w", file, err) } diff --git a/components/payments/internal/app/connectors/mangopay/client/transactions.go b/components/payments/internal/app/connectors/mangopay/client/transactions.go index 4c3b29d4e..56dcba232 100644 --- a/components/payments/internal/app/connectors/mangopay/client/transactions.go +++ b/components/payments/internal/app/connectors/mangopay/client/transactions.go @@ -7,7 +7,7 @@ import ( "net/http" ) -type payment struct { +type Payment struct { Id string `json:"Id"` Tag string `json:"Tag"` CreationDate int64 `json:"CreationDate"` @@ -35,26 +35,7 @@ type payment struct { DebitedWalletID string `json:"DebitedWalletId"` } -func (c *Client) GetAllTransactions(ctx context.Context, userID string) ([]*payment, error) { - var payments []*payment - - for page := 1; ; page++ { - pagedPayments, err := c.getTransactions(ctx, userID, page) - if err != nil { - return nil, err - } - - if len(pagedPayments) == 0 { - break - } - - payments = append(payments, pagedPayments...) - } - - return payments, nil -} - -func (c *Client) getTransactions(ctx context.Context, userID string, page int) ([]*payment, error) { +func (c *Client) GetTransactions(ctx context.Context, userID string, page int) ([]*Payment, error) { endpoint := fmt.Sprintf("%s/v2.01/%s/users/%s/transactions", c.endpoint, c.clientID, userID) req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, http.NoBody) if err != nil { @@ -78,7 +59,7 @@ func (c *Client) getTransactions(ctx context.Context, userID string, page int) ( } }() - var payments []*payment + var payments []*Payment if err := json.NewDecoder(resp.Body).Decode(&payments); err != nil { return nil, fmt.Errorf("failed to unmarshal login response body: %w", err) } diff --git a/components/payments/internal/app/connectors/mangopay/config.go b/components/payments/internal/app/connectors/mangopay/config.go index 7c97b776b..22ebf6f25 100644 --- a/components/payments/internal/app/connectors/mangopay/config.go +++ b/components/payments/internal/app/connectors/mangopay/config.go @@ -4,13 +4,15 @@ import ( "encoding/json" "fmt" + "github.com/formancehq/payments/internal/app/connectors" "github.com/formancehq/payments/internal/app/connectors/configtemplate" ) type Config struct { - ClientID string `json:"clientID" yaml:"clientID" bson:"clientID"` - APIKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` - Endpoint string `json:"endpoint" yaml:"endpoint" bson:"endpoint"` + ClientID string `json:"clientID" yaml:"clientID" bson:"clientID"` + APIKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` + Endpoint string `json:"endpoint" yaml:"endpoint" bson:"endpoint"` + PollingPeriod connectors.Duration `json:"pollingPeriod" yaml:"pollingPeriod" bson:"pollingPeriod"` } func (c Config) String() string { @@ -43,6 +45,7 @@ func (c Config) BuildTemplate() (string, configtemplate.Config) { cfg.AddParameter("clientID", configtemplate.TypeString, true) cfg.AddParameter("apiKey", configtemplate.TypeString, true) cfg.AddParameter("endpoint", configtemplate.TypeString, true) + cfg.AddParameter("pollingPeriod", configtemplate.TypeDurationNs, false) return Name.String(), cfg } diff --git a/components/payments/internal/app/connectors/mangopay/connector.go b/components/payments/internal/app/connectors/mangopay/connector.go index 900105bf1..933de63df 100644 --- a/components/payments/internal/app/connectors/mangopay/connector.go +++ b/components/payments/internal/app/connectors/mangopay/connector.go @@ -23,14 +23,22 @@ func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models. func (c *Connector) Install(ctx task.ConnectorContext) error { taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ - Name: "Fetch users from client", - Key: taskNameFetchUsers, + Name: "Main task to periodically fetch users and transactions", + Key: taskNameMain, }) if err != nil { return err } - return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, true) + return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, models.TaskSchedulerOptions{ + // We want to polling every c.cfg.PollingPeriod.Duration seconds the users + // and their transactions. + ScheduleOption: models.OPTIONS_RUN_INDEFINITELY, + Duration: c.cfg.PollingPeriod.Duration, + // No need to restart this task, since the connector is not existing or + // was uninstalled previously, the task does not exists in the database + Restart: false, + }) } func (c *Connector) Uninstall(ctx context.Context) error { diff --git a/components/payments/internal/app/connectors/mangopay/loader.go b/components/payments/internal/app/connectors/mangopay/loader.go index 91255128e..db93eb568 100644 --- a/components/payments/internal/app/connectors/mangopay/loader.go +++ b/components/payments/internal/app/connectors/mangopay/loader.go @@ -1,6 +1,8 @@ package mangopay import ( + "time" + "github.com/formancehq/payments/internal/app/integration" "github.com/formancehq/payments/internal/app/models" "github.com/formancehq/stack/libs/go-libs/logging" @@ -23,6 +25,10 @@ func (l *Loader) Load(logger logging.Logger, config Config) integration.Connecto } func (l *Loader) ApplyDefaults(cfg Config) Config { + if cfg.PollingPeriod.Duration == 0 { + cfg.PollingPeriod.Duration = 2 * time.Minute + } + return cfg } diff --git a/components/payments/internal/app/connectors/mangopay/task_fetch_transactions.go b/components/payments/internal/app/connectors/mangopay/task_fetch_transactions.go index 93e1245e3..239aa2998 100644 --- a/components/payments/internal/app/connectors/mangopay/task_fetch_transactions.go +++ b/components/payments/internal/app/connectors/mangopay/task_fetch_transactions.go @@ -21,47 +21,63 @@ func taskFetchTransactions(logger logging.Logger, client *client.Client, userID ) error { logger.Info("Fetching transactions for user", userID) - transactions, err := client.GetAllTransactions(ctx, userID) - if err != nil { - return err - } + for page := 1; ; page++ { + pagedPayments, err := client.GetTransactions(ctx, userID, page) + if err != nil { + return err + } - batch := ingestion.PaymentBatch{} - for _, transaction := range transactions { - logger.Info(transaction) + if len(pagedPayments) == 0 { + break + } - rawData, err := json.Marshal(transaction) - if err != nil { - return fmt.Errorf("failed to marshal transaction: %w", err) + if err := ingestBatch(ctx, ingester, pagedPayments); err != nil { + return err } + } + + return nil + } +} + +func ingestBatch( + ctx context.Context, + ingester ingestion.Ingester, + payments []*client.Payment, +) error { + batch := ingestion.PaymentBatch{} + for _, payment := range payments { + rawData, err := json.Marshal(payment) + if err != nil { + return fmt.Errorf("failed to marshal transaction: %w", err) + } - paymentType := matchPaymentType(transaction.Type) + paymentType := matchPaymentType(payment.Type) - batchElement := ingestion.PaymentBatchElement{ - Payment: &models.Payment{ - ID: models.PaymentID{ - PaymentReference: models.PaymentReference{ - Reference: transaction.Id, - Type: paymentType, - }, - Provider: models.ConnectorProviderMangopay, + batchElement := ingestion.PaymentBatchElement{ + Payment: &models.Payment{ + ID: models.PaymentID{ + PaymentReference: models.PaymentReference{ + Reference: payment.Id, + Type: paymentType, }, - CreatedAt: time.Unix(transaction.CreationDate, 0), - Reference: transaction.Id, - Amount: transaction.DebitedFunds.Amount, - Type: paymentType, - Status: matchPaymentStatus(transaction.Status), - Scheme: models.PaymentSchemeOther, - Asset: currency.FormatAsset(transaction.DebitedFunds.Currency), - RawData: rawData, + Provider: models.ConnectorProviderMangopay, }, - } - - batch = append(batch, batchElement) + CreatedAt: time.Unix(payment.CreationDate, 0), + Reference: payment.Id, + Amount: payment.DebitedFunds.Amount, + Type: paymentType, + Status: matchPaymentStatus(payment.Status), + Scheme: models.PaymentSchemeOther, + Asset: currency.FormatAsset(payment.DebitedFunds.Currency), + RawData: rawData, + }, } - return ingester.IngestPayments(ctx, batch, struct{}{}) + batch = append(batch, batchElement) } + + return ingester.IngestPayments(ctx, batch, struct{}{}) } func matchPaymentType(paymentType string) models.PaymentType { diff --git a/components/payments/internal/app/connectors/mangopay/task_fetch_users.go b/components/payments/internal/app/connectors/mangopay/task_fetch_users.go index 437ce589f..5db2f7363 100644 --- a/components/payments/internal/app/connectors/mangopay/task_fetch_users.go +++ b/components/payments/internal/app/connectors/mangopay/task_fetch_users.go @@ -2,6 +2,7 @@ package mangopay import ( "context" + "errors" "github.com/formancehq/payments/internal/app/connectors/mangopay/client" "github.com/formancehq/payments/internal/app/models" @@ -33,8 +34,11 @@ func taskFetchUsers(logger logging.Logger, client *client.Client) task.Task { return err } - err = scheduler.Schedule(ctx, transactionsTask, false) - if err != nil { + err = scheduler.Schedule(ctx, transactionsTask, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { return err } } diff --git a/components/payments/internal/app/connectors/mangopay/task_main.go b/components/payments/internal/app/connectors/mangopay/task_main.go new file mode 100644 index 000000000..194749aec --- /dev/null +++ b/components/payments/internal/app/connectors/mangopay/task_main.go @@ -0,0 +1,38 @@ +package mangopay + +import ( + "context" + "errors" + + "github.com/formancehq/payments/internal/app/models" + "github.com/formancehq/payments/internal/app/task" + "github.com/formancehq/stack/libs/go-libs/logging" +) + +// taskMain is the main task of the connector. It launches the other tasks. +func taskMain(logger logging.Logger) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler, + ) error { + logger.Info(taskNameMain) + + taskUsers, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Fetch users from client", + Key: taskNameFetchUsers, + }) + if err != nil { + return err + } + + err = scheduler.Schedule(ctx, taskUsers, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { + return err + } + + return nil + } +} diff --git a/components/payments/internal/app/connectors/mangopay/task_resolve.go b/components/payments/internal/app/connectors/mangopay/task_resolve.go index 7f957df6f..979658a78 100644 --- a/components/payments/internal/app/connectors/mangopay/task_resolve.go +++ b/components/payments/internal/app/connectors/mangopay/task_resolve.go @@ -3,21 +3,24 @@ package mangopay import ( "fmt" + "github.com/formancehq/payments/internal/app/connectors" "github.com/formancehq/payments/internal/app/connectors/mangopay/client" "github.com/formancehq/payments/internal/app/task" "github.com/formancehq/stack/libs/go-libs/logging" ) const ( + taskNameMain = "main" taskNameFetchUsers = "fetch-users" taskNameFetchTransactions = "fetch-transactions" ) // TaskDescriptor is the definition of a task. type TaskDescriptor struct { - Name string `json:"name" yaml:"name" bson:"name"` - Key string `json:"key" yaml:"key" bson:"key"` - UserID string `json:"userID" yaml:"userID" bson:"userID"` + Name string `json:"name" yaml:"name" bson:"name"` + Key string `json:"key" yaml:"key" bson:"key"` + UserID string `json:"userID" yaml:"userID" bson:"userID"` + PollingPeriod connectors.Duration `json:"pollingPeriod" yaml:"pollingPeriod" bson:"pollingPeriod"` } // clientID, apiKey, endpoint string, logger logging @@ -36,6 +39,8 @@ func resolveTasks(logger logging.Logger, config Config) func(taskDefinition Task return func(taskDescriptor TaskDescriptor) task.Task { switch taskDescriptor.Key { + case taskNameMain: + return taskMain(logger) case taskNameFetchUsers: return taskFetchUsers(logger, mangopayClient) case taskNameFetchTransactions: diff --git a/components/payments/internal/app/connectors/modulr/config.go b/components/payments/internal/app/connectors/modulr/config.go index b2ca26369..bd3547a55 100644 --- a/components/payments/internal/app/connectors/modulr/config.go +++ b/components/payments/internal/app/connectors/modulr/config.go @@ -4,13 +4,15 @@ import ( "encoding/json" "fmt" + "github.com/formancehq/payments/internal/app/connectors" "github.com/formancehq/payments/internal/app/connectors/configtemplate" ) type Config struct { - APIKey string `json:"apiKey" bson:"apiKey"` - APISecret string `json:"apiSecret" bson:"apiSecret"` - Endpoint string `json:"endpoint" bson:"endpoint"` + APIKey string `json:"apiKey" bson:"apiKey"` + APISecret string `json:"apiSecret" bson:"apiSecret"` + Endpoint string `json:"endpoint" bson:"endpoint"` + PollingPeriod connectors.Duration `json:"pollingPeriod" yaml:"pollingPeriod" bson:"pollingPeriod"` } // String obfuscates sensitive fields and returns a string representation of the config. @@ -41,6 +43,7 @@ func (c Config) BuildTemplate() (string, configtemplate.Config) { cfg.AddParameter("apiKey", configtemplate.TypeString, true) cfg.AddParameter("apiSecret", configtemplate.TypeString, true) cfg.AddParameter("endpoint", configtemplate.TypeString, false) + cfg.AddParameter("pollingPeriod", configtemplate.TypeDurationNs, false) return Name.String(), cfg } diff --git a/components/payments/internal/app/connectors/modulr/connector.go b/components/payments/internal/app/connectors/modulr/connector.go index 77ed3d338..946195548 100644 --- a/components/payments/internal/app/connectors/modulr/connector.go +++ b/components/payments/internal/app/connectors/modulr/connector.go @@ -24,14 +24,22 @@ func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models. func (c *Connector) Install(ctx task.ConnectorContext) error { taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ - Name: "Fetch accounts from client", - Key: taskNameFetchAccounts, + Name: "Main task to periodically fetch accounts and transactions", + Key: taskNameMain, }) if err != nil { return err } - return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, false) + return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, models.TaskSchedulerOptions{ + // We want to polling every c.cfg.PollingPeriod.Duration seconds the users + // and their transactions. + ScheduleOption: models.OPTIONS_RUN_INDEFINITELY, + Duration: c.cfg.PollingPeriod.Duration, + // No need to restart this task, since the connector is not existing or + // was uninstalled previously, the task does not exists in the database + Restart: false, + }) } func (c *Connector) Uninstall(ctx context.Context) error { diff --git a/components/payments/internal/app/connectors/modulr/loader.go b/components/payments/internal/app/connectors/modulr/loader.go index 456d49a2d..425d649ae 100644 --- a/components/payments/internal/app/connectors/modulr/loader.go +++ b/components/payments/internal/app/connectors/modulr/loader.go @@ -1,6 +1,8 @@ package modulr import ( + "time" + "github.com/formancehq/payments/internal/app/integration" "github.com/formancehq/payments/internal/app/models" "github.com/formancehq/stack/libs/go-libs/logging" @@ -23,6 +25,10 @@ func (l *Loader) Load(logger logging.Logger, config Config) integration.Connecto } func (l *Loader) ApplyDefaults(cfg Config) Config { + if cfg.PollingPeriod.Duration == 0 { + cfg.PollingPeriod.Duration = 2 * time.Minute + } + return cfg } diff --git a/components/payments/internal/app/connectors/modulr/task_fetch_accounts.go b/components/payments/internal/app/connectors/modulr/task_fetch_accounts.go index fa58b1c15..50000be54 100644 --- a/components/payments/internal/app/connectors/modulr/task_fetch_accounts.go +++ b/components/payments/internal/app/connectors/modulr/task_fetch_accounts.go @@ -2,6 +2,7 @@ package modulr import ( "context" + "errors" "github.com/formancehq/payments/internal/app/models" @@ -35,8 +36,11 @@ func taskFetchAccounts(logger logging.Logger, client *client.Client) task.Task { return err } - err = scheduler.Schedule(ctx, transactionsTask, false) - if err != nil { + err = scheduler.Schedule(ctx, transactionsTask, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { return err } } diff --git a/components/payments/internal/app/connectors/modulr/task_main.go b/components/payments/internal/app/connectors/modulr/task_main.go new file mode 100644 index 000000000..578aef78a --- /dev/null +++ b/components/payments/internal/app/connectors/modulr/task_main.go @@ -0,0 +1,38 @@ +package modulr + +import ( + "context" + "errors" + + "github.com/formancehq/payments/internal/app/models" + "github.com/formancehq/payments/internal/app/task" + "github.com/formancehq/stack/libs/go-libs/logging" +) + +// taskMain is the main task of the connector. It launches the other tasks. +func taskMain(logger logging.Logger) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler, + ) error { + logger.Info(taskNameMain) + + taskUsers, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Fetch users from client", + Key: taskNameFetchAccounts, + }) + if err != nil { + return err + } + + err = scheduler.Schedule(ctx, taskUsers, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { + return err + } + + return nil + } +} diff --git a/components/payments/internal/app/connectors/modulr/task_resolve.go b/components/payments/internal/app/connectors/modulr/task_resolve.go index 96e8e8721..c3668bc9f 100644 --- a/components/payments/internal/app/connectors/modulr/task_resolve.go +++ b/components/payments/internal/app/connectors/modulr/task_resolve.go @@ -10,6 +10,7 @@ import ( ) const ( + taskNameMain = "main" taskNameFetchTransactions = "fetch-transactions" taskNameFetchAccounts = "fetch-accounts" ) @@ -33,6 +34,8 @@ func resolveTasks(logger logging.Logger, config Config) func(taskDefinition Task return func(taskDefinition TaskDescriptor) task.Task { switch taskDefinition.Key { + case taskNameMain: + return taskMain(logger) case taskNameFetchAccounts: return taskFetchAccounts(logger, modulrClient) case taskNameFetchTransactions: diff --git a/components/payments/internal/app/connectors/moneycorp/client/accounts.go b/components/payments/internal/app/connectors/moneycorp/client/accounts.go index 5ca42ff6b..b0ed374fe 100644 --- a/components/payments/internal/app/connectors/moneycorp/client/accounts.go +++ b/components/payments/internal/app/connectors/moneycorp/client/accounts.go @@ -8,6 +8,10 @@ import ( "strconv" ) +const ( + pageSize = 100 +) + type accountsResponse struct { Accounts []account `json:"data"` } diff --git a/components/payments/internal/app/connectors/moneycorp/client/auth.go b/components/payments/internal/app/connectors/moneycorp/client/auth.go index cd1c414a2..05c669683 100644 --- a/components/payments/internal/app/connectors/moneycorp/client/auth.go +++ b/components/payments/internal/app/connectors/moneycorp/client/auth.go @@ -39,7 +39,7 @@ func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { } func (t *apiTransport) ensureAccessTokenIsValid(ctx context.Context) error { - if t.accessTokenExpiresAt.After(time.Now()) { + if t.accessTokenExpiresAt.After(time.Now().Add(5 * time.Second)) { return nil } diff --git a/components/payments/internal/app/connectors/moneycorp/client/client.go b/components/payments/internal/app/connectors/moneycorp/client/client.go index c2f839ab4..0ae7301fb 100644 --- a/components/payments/internal/app/connectors/moneycorp/client/client.go +++ b/components/payments/internal/app/connectors/moneycorp/client/client.go @@ -9,10 +9,6 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) -const ( - pageSize = 1000 -) - type Client struct { httpClient *http.Client diff --git a/components/payments/internal/app/connectors/moneycorp/client/transactions.go b/components/payments/internal/app/connectors/moneycorp/client/transactions.go index feaa40c3b..dc0ba7511 100644 --- a/components/payments/internal/app/connectors/moneycorp/client/transactions.go +++ b/components/payments/internal/app/connectors/moneycorp/client/transactions.go @@ -9,10 +9,10 @@ import ( ) type transactionsResponse struct { - Transactions []*transaction `json:"data"` + Transactions []*Transaction `json:"data"` } -type transaction struct { +type Transaction struct { ID string `json:"id"` Type string `json:"type"` Attributes struct { @@ -27,30 +27,7 @@ type transaction struct { } `json:"attributes"` } -func (c *Client) GetAllTransactions(ctx context.Context, accountID string) ([]*transaction, error) { - var transactions []*transaction - - for page := 0; ; page++ { - pagedTransactions, err := c.getTransactions(ctx, accountID, page, pageSize) - if err != nil { - return nil, err - } - - if len(pagedTransactions) == 0 { - break - } - - transactions = append(transactions, pagedTransactions...) - - if len(pagedTransactions) < pageSize { - break - } - } - - return transactions, nil -} - -func (c *Client) getTransactions(ctx context.Context, accountID string, page, pageSize int) ([]*transaction, error) { +func (c *Client) GetTransactions(ctx context.Context, accountID string, page, pageSize int) ([]*Transaction, error) { endpoint := fmt.Sprintf("%s/accounts/%s/transactions/find", c.endpoint, accountID) req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, http.NoBody) if err != nil { diff --git a/components/payments/internal/app/connectors/moneycorp/config.go b/components/payments/internal/app/connectors/moneycorp/config.go index 5bb6983de..cead5ac95 100644 --- a/components/payments/internal/app/connectors/moneycorp/config.go +++ b/components/payments/internal/app/connectors/moneycorp/config.go @@ -4,13 +4,15 @@ import ( "encoding/json" "fmt" + "github.com/formancehq/payments/internal/app/connectors" "github.com/formancehq/payments/internal/app/connectors/configtemplate" ) type Config struct { - ClientID string `json:"clientID" yaml:"clientID" bson:"clientID"` - APIKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` - Endpoint string `json:"endpoint" yaml:"endpoint" bson:"endpoint"` + ClientID string `json:"clientID" yaml:"clientID" bson:"clientID"` + APIKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` + Endpoint string `json:"endpoint" yaml:"endpoint" bson:"endpoint"` + PollingPeriod connectors.Duration `json:"pollingPeriod" yaml:"pollingPeriod" bson:"pollingPeriod"` } func (c Config) String() string { @@ -43,6 +45,7 @@ func (c Config) BuildTemplate() (string, configtemplate.Config) { cfg.AddParameter("clientID", configtemplate.TypeString, true) cfg.AddParameter("apiKey", configtemplate.TypeString, true) cfg.AddParameter("endpoint", configtemplate.TypeString, true) + cfg.AddParameter("pollingPeriod", configtemplate.TypeDurationNs, false) return Name.String(), cfg } diff --git a/components/payments/internal/app/connectors/moneycorp/connector.go b/components/payments/internal/app/connectors/moneycorp/connector.go index e1f115bb8..cf5e53c7a 100644 --- a/components/payments/internal/app/connectors/moneycorp/connector.go +++ b/components/payments/internal/app/connectors/moneycorp/connector.go @@ -23,14 +23,22 @@ func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models. func (c *Connector) Install(ctx task.ConnectorContext) error { taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ - Name: "Fetch users from client", - Key: taskNameFetchAccounts, + Name: "Main task to periodically fetch accounts and transactions", + Key: taskNameMain, }) if err != nil { return err } - return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, true) + return ctx.Scheduler().Schedule(ctx.Context(), taskDescriptor, models.TaskSchedulerOptions{ + // We want to polling every c.cfg.PollingPeriod.Duration seconds the users + // and their transactions. + ScheduleOption: models.OPTIONS_RUN_INDEFINITELY, + Duration: c.cfg.PollingPeriod.Duration, + // No need to restart this task, since the connector is not existing or + // was uninstalled previously, the task does not exists in the database + Restart: false, + }) } func (c *Connector) Uninstall(ctx context.Context) error { diff --git a/components/payments/internal/app/connectors/moneycorp/loader.go b/components/payments/internal/app/connectors/moneycorp/loader.go index 838fd323c..7554165f6 100644 --- a/components/payments/internal/app/connectors/moneycorp/loader.go +++ b/components/payments/internal/app/connectors/moneycorp/loader.go @@ -1,6 +1,8 @@ package moneycorp import ( + "time" + "github.com/formancehq/payments/internal/app/integration" "github.com/formancehq/payments/internal/app/models" "github.com/formancehq/stack/libs/go-libs/logging" @@ -23,6 +25,10 @@ func (l *Loader) Load(logger logging.Logger, config Config) integration.Connecto } func (l *Loader) ApplyDefaults(cfg Config) Config { + if cfg.PollingPeriod.Duration == 0 { + cfg.PollingPeriod.Duration = 2 * time.Minute + } + return cfg } diff --git a/components/payments/internal/app/connectors/moneycorp/task_fetch_accounts.go b/components/payments/internal/app/connectors/moneycorp/task_fetch_accounts.go index af32af10d..37d69e5cc 100644 --- a/components/payments/internal/app/connectors/moneycorp/task_fetch_accounts.go +++ b/components/payments/internal/app/connectors/moneycorp/task_fetch_accounts.go @@ -2,6 +2,7 @@ package moneycorp import ( "context" + "errors" "github.com/formancehq/payments/internal/app/connectors/moneycorp/client" "github.com/formancehq/payments/internal/app/models" @@ -33,8 +34,11 @@ func taskFetchAccounts(logger logging.Logger, client *client.Client) task.Task { return err } - err = scheduler.Schedule(ctx, transactionsTask, false) - if err != nil { + err = scheduler.Schedule(ctx, transactionsTask, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { return err } } diff --git a/components/payments/internal/app/connectors/moneycorp/task_fetch_transactions.go b/components/payments/internal/app/connectors/moneycorp/task_fetch_transactions.go index 612b984e3..09bc41b0e 100644 --- a/components/payments/internal/app/connectors/moneycorp/task_fetch_transactions.go +++ b/components/payments/internal/app/connectors/moneycorp/task_fetch_transactions.go @@ -15,6 +15,10 @@ import ( "github.com/formancehq/stack/libs/go-libs/logging" ) +const ( + pageSize = 100 +) + func taskFetchTransactions(logger logging.Logger, client *client.Client, accountID string) task.Task { return func( ctx context.Context, @@ -22,55 +26,75 @@ func taskFetchTransactions(logger logging.Logger, client *client.Client, account ) error { logger.Info("Fetching transactions for account", accountID) - transactions, err := client.GetAllTransactions(ctx, accountID) - if err != nil { - return err - } - - batch := ingestion.PaymentBatch{} - for _, transaction := range transactions { - logger.Info(transaction) - - rawData, err := json.Marshal(transaction) + for page := 0; ; page++ { + pagedTransactions, err := client.GetTransactions(ctx, accountID, page, pageSize) if err != nil { - return fmt.Errorf("failed to marshal transaction: %w", err) + return err } - paymentType, shouldBeRecorded := matchPaymentType(transaction.Attributes.Type, transaction.Attributes.Direction) - if !shouldBeRecorded { - continue + if len(pagedTransactions) == 0 { + break } - createdAt, err := time.Parse("2006-01-02T15:04:05.999999999", transaction.Attributes.CreatedAt) - if err != nil { - return fmt.Errorf("failed to parse transaction date: %w", err) + if err := ingestBatch(ctx, ingester, pagedTransactions); err != nil { + return err } - batchElement := ingestion.PaymentBatchElement{ - Payment: &models.Payment{ - ID: models.PaymentID{ - PaymentReference: models.PaymentReference{ - Reference: transaction.ID, - Type: paymentType, - }, - Provider: models.ConnectorProviderMoneycorp, - }, - CreatedAt: createdAt, - Reference: transaction.ID, - Amount: int64(transaction.Attributes.Amount * math.Pow(10, float64(currency.GetPrecision(transaction.Attributes.Currency)))), - Asset: currency.FormatAsset(transaction.Attributes.Currency), - Type: paymentType, - Status: models.PaymentStatusSucceeded, - Scheme: models.PaymentSchemeOther, - RawData: rawData, - }, + if len(pagedTransactions) < pageSize { + break } + } + + return nil + } +} + +func ingestBatch( + ctx context.Context, + ingester ingestion.Ingester, + transactions []*client.Transaction, +) error { + batch := ingestion.PaymentBatch{} + for _, transaction := range transactions { + rawData, err := json.Marshal(transaction) + if err != nil { + return fmt.Errorf("failed to marshal transaction: %w", err) + } + + paymentType, shouldBeRecorded := matchPaymentType(transaction.Attributes.Type, transaction.Attributes.Direction) + if !shouldBeRecorded { + continue + } - batch = append(batch, batchElement) + createdAt, err := time.Parse("2006-01-02T15:04:05.999999999", transaction.Attributes.CreatedAt) + if err != nil { + return fmt.Errorf("failed to parse transaction date: %w", err) + } + + batchElement := ingestion.PaymentBatchElement{ + Payment: &models.Payment{ + ID: models.PaymentID{ + PaymentReference: models.PaymentReference{ + Reference: transaction.ID, + Type: paymentType, + }, + Provider: models.ConnectorProviderMoneycorp, + }, + CreatedAt: createdAt, + Reference: transaction.ID, + Amount: int64(transaction.Attributes.Amount * math.Pow(10, float64(currency.GetPrecision(transaction.Attributes.Currency)))), + Asset: currency.FormatAsset(transaction.Attributes.Currency), + Type: paymentType, + Status: models.PaymentStatusSucceeded, + Scheme: models.PaymentSchemeOther, + RawData: rawData, + }, } - return ingester.IngestPayments(ctx, batch, struct{}{}) + batch = append(batch, batchElement) } + + return ingester.IngestPayments(ctx, batch, struct{}{}) } func matchPaymentType(transactionType string, transactionDirection string) (models.PaymentType, bool) { diff --git a/components/payments/internal/app/connectors/moneycorp/task_main.go b/components/payments/internal/app/connectors/moneycorp/task_main.go new file mode 100644 index 000000000..b6370c494 --- /dev/null +++ b/components/payments/internal/app/connectors/moneycorp/task_main.go @@ -0,0 +1,38 @@ +package moneycorp + +import ( + "context" + "errors" + + "github.com/formancehq/payments/internal/app/models" + "github.com/formancehq/payments/internal/app/task" + "github.com/formancehq/stack/libs/go-libs/logging" +) + +// taskMain is the main task of the connector. It launches the other tasks. +func taskMain(logger logging.Logger) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler, + ) error { + logger.Info(taskNameMain) + + taskUsers, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Fetch accounts from client", + Key: taskNameFetchAccounts, + }) + if err != nil { + return err + } + + err = scheduler.Schedule(ctx, taskUsers, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { + return err + } + + return nil + } +} diff --git a/components/payments/internal/app/connectors/moneycorp/task_resolve.go b/components/payments/internal/app/connectors/moneycorp/task_resolve.go index b7139b151..a616ee372 100644 --- a/components/payments/internal/app/connectors/moneycorp/task_resolve.go +++ b/components/payments/internal/app/connectors/moneycorp/task_resolve.go @@ -9,6 +9,7 @@ import ( ) const ( + taskNameMain = "main" taskNameFetchAccounts = "fetch-accounts" taskNameFetchTransactions = "fetch-transactions" ) @@ -36,6 +37,8 @@ func resolveTasks(logger logging.Logger, config Config) func(taskDefinition Task return func(taskDescriptor TaskDescriptor) task.Task { switch taskDescriptor.Key { + case taskNameMain: + return taskMain(logger) case taskNameFetchAccounts: return taskFetchAccounts(logger, moneycorpClient) case taskNameFetchTransactions: diff --git a/components/payments/internal/app/connectors/stripe/connector.go b/components/payments/internal/app/connectors/stripe/connector.go index 0e930f3ec..53a12647d 100644 --- a/components/payments/internal/app/connectors/stripe/connector.go +++ b/components/payments/internal/app/connectors/stripe/connector.go @@ -29,7 +29,12 @@ func (c *Connector) Install(ctx task.ConnectorContext) error { return err } - return ctx.Scheduler().Schedule(ctx.Context(), descriptor, false) + return ctx.Scheduler().Schedule(ctx.Context(), descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + // No need to restart this task, since the connector is not existing or + // was uninstalled previously, the task does not exists in the database + Restart: false, + }) } func (c *Connector) Uninstall(ctx context.Context) error { @@ -62,7 +67,10 @@ func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models. return err } - return ctx.Scheduler().Schedule(ctx.Context(), descriptor, false) + return ctx.Scheduler().Schedule(ctx.Context(), descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + }) } var _ integration.Connector = &Connector{} diff --git a/components/payments/internal/app/connectors/stripe/task_main.go b/components/payments/internal/app/connectors/stripe/task_main.go index 86efa8525..e58301aa4 100644 --- a/components/payments/internal/app/connectors/stripe/task_main.go +++ b/components/payments/internal/app/connectors/stripe/task_main.go @@ -43,7 +43,10 @@ func ingest( return errors.Wrap(err, "failed to transform task descriptor") } - err = scheduler.Schedule(ctx, descriptor, true) + err = scheduler.Schedule(ctx, descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { return errors.Wrap(err, "scheduling connected account") } diff --git a/components/payments/internal/app/connectors/wise/client/client.go b/components/payments/internal/app/connectors/wise/client/client.go new file mode 100644 index 000000000..8222dd378 --- /dev/null +++ b/components/payments/internal/app/connectors/wise/client/client.go @@ -0,0 +1,42 @@ +package client + +import ( + "fmt" + "net/http" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +const apiEndpoint = "https://api.wise.com" + +type apiTransport struct { + APIKey string + underlying http.RoundTripper +} + +func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", t.APIKey)) + + return t.underlying.RoundTrip(req) +} + +type Client struct { + httpClient *http.Client +} + +func (w *Client) endpoint(path string) string { + return fmt.Sprintf("%s/%s", apiEndpoint, path) +} + +func NewClient(apiKey string) *Client { + httpClient := &http.Client{ + Transport: &apiTransport{ + APIKey: apiKey, + underlying: otelhttp.NewTransport(http.DefaultTransport), + }, + } + + return &Client{ + httpClient: httpClient, + } +} diff --git a/components/payments/internal/app/connectors/wise/client/profiles.go b/components/payments/internal/app/connectors/wise/client/profiles.go new file mode 100644 index 000000000..800a35b40 --- /dev/null +++ b/components/payments/internal/app/connectors/wise/client/profiles.go @@ -0,0 +1,35 @@ +package client + +import ( + "encoding/json" + "fmt" + "io" +) + +type Profile struct { + ID uint64 `json:"id"` + Type string `json:"type"` +} + +func (w *Client) GetProfiles() ([]Profile, error) { + var profiles []Profile + + res, err := w.httpClient.Get(w.endpoint("v1/profiles")) + if err != nil { + return profiles, err + } + + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + err = json.Unmarshal(body, &profiles) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal profiles: %w", err) + } + + return profiles, nil +} diff --git a/components/payments/internal/app/connectors/wise/client/quotes.go b/components/payments/internal/app/connectors/wise/client/quotes.go new file mode 100644 index 000000000..ba9e80ebb --- /dev/null +++ b/components/payments/internal/app/connectors/wise/client/quotes.go @@ -0,0 +1,46 @@ +package client + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + + "github.com/google/uuid" +) + +type Quote struct { + ID uuid.UUID `json:"id"` +} + +func (w *Client) CreateQuote(profileID uint64, currency string, amount int64) (Quote, error) { + var response Quote + + req, err := json.Marshal(map[string]interface{}{ + "sourceCurrency": currency, + "targetCurrency": currency, + "sourceAmount": amount, + }) + if err != nil { + return response, err + } + + res, err := w.httpClient.Post(w.endpoint("v3/profiles/"+fmt.Sprint(profileID)+"/quotes"), "application/json", bytes.NewBuffer(req)) + if err != nil { + return response, err + } + + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return response, fmt.Errorf("failed to read response body: %w", err) + } + + err = json.Unmarshal(body, &response) + if err != nil { + return response, fmt.Errorf("failed to unmarshal profiles: %w", err) + } + + return response, nil +} diff --git a/components/payments/internal/app/connectors/wise/client.go b/components/payments/internal/app/connectors/wise/client/transfers.go similarity index 50% rename from components/payments/internal/app/connectors/wise/client.go rename to components/payments/internal/app/connectors/wise/client/transfers.go index 658208bdf..80cdca445 100644 --- a/components/payments/internal/app/connectors/wise/client.go +++ b/components/payments/internal/app/connectors/wise/client/transfers.go @@ -1,4 +1,4 @@ -package wise +package client import ( "bytes" @@ -8,35 +8,9 @@ import ( "io" "net/http" "time" - - "github.com/google/uuid" - - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) -const apiEndpoint = "https://api.wise.com" - -type apiTransport struct { - APIKey string - underlying http.RoundTripper -} - -func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", t.APIKey)) - - return t.underlying.RoundTrip(req) -} - -type client struct { - httpClient *http.Client -} - -type profile struct { - ID uint64 `json:"id"` - Type string `json:"type"` -} - -type transfer struct { +type Transfer struct { ID uint64 `json:"id"` Reference string `json:"reference"` Status string `json:"status"` @@ -56,11 +30,11 @@ type transfer struct { Rate float64 `json:"rate"` User uint64 `json:"user"` - createdAt time.Time + CreatedAt time.Time `json:"-"` } -func (t *transfer) UnmarshalJSON(data []byte) error { - type Alias transfer +func (t *Transfer) UnmarshalJSON(data []byte) error { + type Alias Transfer aux := &struct { Created string `json:"created"` @@ -75,7 +49,7 @@ func (t *transfer) UnmarshalJSON(data []byte) error { var err error - t.createdAt, err = time.Parse("2006-01-02 15:04:05", aux.Created) + t.CreatedAt, err = time.Parse("2006-01-02 15:04:05", aux.Created) if err != nil { return fmt.Errorf("failed to parse created time: %w", err) } @@ -83,35 +57,8 @@ func (t *transfer) UnmarshalJSON(data []byte) error { return nil } -func (w *client) endpoint(path string) string { - return fmt.Sprintf("%s/%s", apiEndpoint, path) -} - -func (w *client) getProfiles() ([]profile, error) { - var profiles []profile - - res, err := w.httpClient.Get(w.endpoint("v1/profiles")) - if err != nil { - return profiles, err - } - - defer res.Body.Close() - - body, err := io.ReadAll(res.Body) - if err != nil { - return nil, fmt.Errorf("failed to read response body: %w", err) - } - - err = json.Unmarshal(body, &profiles) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal profiles: %w", err) - } - - return profiles, nil -} - -func (w *client) getTransfers(ctx context.Context, profile *profile) ([]transfer, error) { - var transfers []transfer +func (w *Client) GetTransfers(ctx context.Context, profile *Profile) ([]Transfer, error) { + var transfers []Transfer limit := 10 offset := 0 @@ -145,7 +92,7 @@ func (w *client) getTransfers(ctx context.Context, profile *profile) ([]transfer return nil, fmt.Errorf("failed to close response body: %w", err) } - var transferList []transfer + var transferList []Transfer err = json.Unmarshal(body, &transferList) if err != nil { @@ -164,43 +111,7 @@ func (w *client) getTransfers(ctx context.Context, profile *profile) ([]transfer return transfers, nil } -type quote struct { - ID uuid.UUID `json:"id"` -} - -func (w *client) createQuote(profileID uint64, currency string, amount int64) (quote, error) { - var response quote - - req, err := json.Marshal(map[string]interface{}{ - "sourceCurrency": currency, - "targetCurrency": currency, - "sourceAmount": amount, - }) - if err != nil { - return response, err - } - - res, err := w.httpClient.Post(w.endpoint("v3/profiles/"+fmt.Sprint(profileID)+"/quotes"), "application/json", bytes.NewBuffer(req)) - if err != nil { - return response, err - } - - defer res.Body.Close() - - body, err := io.ReadAll(res.Body) - if err != nil { - return response, fmt.Errorf("failed to read response body: %w", err) - } - - err = json.Unmarshal(body, &response) - if err != nil { - return response, fmt.Errorf("failed to unmarshal profiles: %w", err) - } - - return response, nil -} - -func (w *client) createTransfer(quote quote, targetAccount uint64, transactionID string) error { +func (w *Client) CreateTransfer(quote Quote, targetAccount uint64, transactionID string) error { req, err := json.Marshal(map[string]interface{}{ "targetAccount": targetAccount, "quoteUuid": quote.ID.String(), @@ -223,16 +134,3 @@ func (w *client) createTransfer(quote quote, targetAccount uint64, transactionID return nil } - -func newClient(apiKey string) *client { - httpClient := &http.Client{ - Transport: &apiTransport{ - APIKey: apiKey, - underlying: otelhttp.NewTransport(http.DefaultTransport), - }, - } - - return &client{ - httpClient: httpClient, - } -} diff --git a/components/payments/internal/app/connectors/wise/config.go b/components/payments/internal/app/connectors/wise/config.go index 31552dd6e..3183ce01f 100644 --- a/components/payments/internal/app/connectors/wise/config.go +++ b/components/payments/internal/app/connectors/wise/config.go @@ -3,11 +3,13 @@ package wise import ( "encoding/json" + "github.com/formancehq/payments/internal/app/connectors" "github.com/formancehq/payments/internal/app/connectors/configtemplate" ) type Config struct { - APIKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` + APIKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` + PollingPeriod connectors.Duration `json:"pollingPeriod" yaml:"pollingPeriod" bson:"pollingPeriod"` } // String obfuscates sensitive fields and returns a string representation of the config. @@ -32,6 +34,7 @@ func (c Config) BuildTemplate() (string, configtemplate.Config) { cfg := configtemplate.NewConfig() cfg.AddParameter("apiKey", configtemplate.TypeString, true) + cfg.AddParameter("pollingPeriod", configtemplate.TypeDurationNs, false) return Name.String(), cfg } diff --git a/components/payments/internal/app/connectors/wise/connector.go b/components/payments/internal/app/connectors/wise/connector.go index 8785d1b5e..7144e71a3 100644 --- a/components/payments/internal/app/connectors/wise/connector.go +++ b/components/payments/internal/app/connectors/wise/connector.go @@ -33,7 +33,10 @@ func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models. return err } - return ctx.Scheduler().Schedule(ctx.Context(), descriptor, true) + return ctx.Scheduler().Schedule(ctx.Context(), descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) } func (c *Connector) Install(ctx task.ConnectorContext) error { @@ -45,7 +48,15 @@ func (c *Connector) Install(ctx task.ConnectorContext) error { return err } - return ctx.Scheduler().Schedule(ctx.Context(), descriptor, true) + return ctx.Scheduler().Schedule(ctx.Context(), descriptor, models.TaskSchedulerOptions{ + // We want to polling every c.cfg.PollingPeriod.Duration seconds the users + // and their transactions. + ScheduleOption: models.OPTIONS_RUN_INDEFINITELY, + Duration: c.cfg.PollingPeriod.Duration, + // No need to restart this task, since the connector is not existing or + // was uninstalled previously, the task does not exists in the database + Restart: false, + }) } func (c *Connector) Uninstall(ctx context.Context) error { diff --git a/components/payments/internal/app/connectors/wise/loader.go b/components/payments/internal/app/connectors/wise/loader.go index 2ced10de5..0bbc3b771 100644 --- a/components/payments/internal/app/connectors/wise/loader.go +++ b/components/payments/internal/app/connectors/wise/loader.go @@ -1,6 +1,8 @@ package wise import ( + "time" + "github.com/formancehq/payments/internal/app/integration" "github.com/formancehq/payments/internal/app/models" "github.com/formancehq/stack/libs/go-libs/logging" @@ -23,6 +25,10 @@ func (l *Loader) Load(logger logging.Logger, config Config) integration.Connecto } func (l *Loader) ApplyDefaults(cfg Config) Config { + if cfg.PollingPeriod.Duration == 0 { + cfg.PollingPeriod.Duration = 2 * time.Minute + } + return cfg } diff --git a/components/payments/internal/app/connectors/wise/task_fetch_profiles.go b/components/payments/internal/app/connectors/wise/task_fetch_profiles.go index eab97cc97..0aab344a3 100644 --- a/components/payments/internal/app/connectors/wise/task_fetch_profiles.go +++ b/components/payments/internal/app/connectors/wise/task_fetch_profiles.go @@ -4,19 +4,18 @@ import ( "context" "fmt" + "github.com/formancehq/payments/internal/app/connectors/wise/client" "github.com/formancehq/payments/internal/app/models" - "github.com/formancehq/payments/internal/app/task" - "github.com/formancehq/stack/libs/go-libs/logging" ) -func taskFetchProfiles(logger logging.Logger, client *client) task.Task { +func taskFetchProfiles(logger logging.Logger, client *client.Client) task.Task { return func( ctx context.Context, scheduler task.Scheduler, ) error { - profiles, err := client.getProfiles() + profiles, err := client.GetProfiles() if err != nil { return err } @@ -33,7 +32,10 @@ func taskFetchProfiles(logger logging.Logger, client *client) task.Task { return err } - err = scheduler.Schedule(ctx, descriptor, true) + err = scheduler.Schedule(ctx, descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) if err != nil { return err } diff --git a/components/payments/internal/app/connectors/wise/task_fetch_transfers.go b/components/payments/internal/app/connectors/wise/task_fetch_transfers.go index 0038cfda8..91703ab81 100644 --- a/components/payments/internal/app/connectors/wise/task_fetch_transfers.go +++ b/components/payments/internal/app/connectors/wise/task_fetch_transfers.go @@ -6,20 +6,20 @@ import ( "fmt" "time" - "github.com/formancehq/payments/internal/app/models" - + "github.com/formancehq/payments/internal/app/connectors/wise/client" "github.com/formancehq/payments/internal/app/ingestion" + "github.com/formancehq/payments/internal/app/models" "github.com/formancehq/payments/internal/app/task" "github.com/formancehq/stack/libs/go-libs/logging" ) -func taskFetchTransfers(logger logging.Logger, client *client, profileID uint64) task.Task { +func taskFetchTransfers(logger logging.Logger, c *client.Client, profileID uint64) task.Task { return func( ctx context.Context, scheduler task.Scheduler, ingester ingestion.Ingester, ) error { - transfers, err := client.getTransfers(ctx, &profile{ + transfers, err := c.GetTransfers(ctx, &client.Profile{ ID: profileID, }) if err != nil { @@ -56,7 +56,7 @@ func taskFetchTransfers(logger logging.Logger, client *client, profileID uint64) }, Provider: models.ConnectorProviderWise, }, - CreatedAt: transfer.createdAt, + CreatedAt: transfer.CreatedAt, Reference: fmt.Sprintf("%d", transfer.ID), Type: models.PaymentTypeTransfer, Status: matchTransferStatus(transfer.Status), @@ -121,7 +121,10 @@ func taskFetchTransfers(logger logging.Logger, client *client, profileID uint64) return err } - return scheduler.Schedule(ctx, descriptor, true) + return scheduler.Schedule(ctx, descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) } } diff --git a/components/payments/internal/app/connectors/wise/task_main.go b/components/payments/internal/app/connectors/wise/task_main.go new file mode 100644 index 000000000..6a13480f7 --- /dev/null +++ b/components/payments/internal/app/connectors/wise/task_main.go @@ -0,0 +1,38 @@ +package wise + +import ( + "context" + "errors" + + "github.com/formancehq/payments/internal/app/models" + "github.com/formancehq/payments/internal/app/task" + "github.com/formancehq/stack/libs/go-libs/logging" +) + +// taskMain is the main task of the connector. It launches the other tasks. +func taskMain(logger logging.Logger) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler, + ) error { + logger.Info(taskNameMain) + + taskUsers, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Fetch users from client", + Key: taskNameFetchProfiles, + }) + if err != nil { + return err + } + + err = scheduler.Schedule(ctx, taskUsers, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: true, + }) + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { + return err + } + + return nil + } +} diff --git a/components/payments/internal/app/connectors/wise/task_resolve.go b/components/payments/internal/app/connectors/wise/task_resolve.go index 39e5bce46..1ae1e0938 100644 --- a/components/payments/internal/app/connectors/wise/task_resolve.go +++ b/components/payments/internal/app/connectors/wise/task_resolve.go @@ -5,12 +5,13 @@ import ( "github.com/google/uuid" + "github.com/formancehq/payments/internal/app/connectors/wise/client" "github.com/formancehq/payments/internal/app/task" - "github.com/formancehq/stack/libs/go-libs/logging" ) const ( + taskNameMain = "main" taskNameFetchTransfers = "fetch-transfers" taskNameFetchProfiles = "fetch-profiles" taskNameTransfer = "transfer" @@ -33,10 +34,12 @@ type Transfer struct { } func resolveTasks(logger logging.Logger, config Config) func(taskDefinition TaskDescriptor) task.Task { - client := newClient(config.APIKey) + client := client.NewClient(config.APIKey) return func(taskDefinition TaskDescriptor) task.Task { switch taskDefinition.Key { + case taskNameMain: + return taskMain(logger) case taskNameFetchProfiles: return taskFetchProfiles(logger, client) case taskNameFetchTransfers: diff --git a/components/payments/internal/app/connectors/wise/task_transfer.go b/components/payments/internal/app/connectors/wise/task_transfer.go index c0f480248..41c29a594 100644 --- a/components/payments/internal/app/connectors/wise/task_transfer.go +++ b/components/payments/internal/app/connectors/wise/task_transfer.go @@ -7,17 +7,17 @@ import ( "github.com/google/uuid" + "github.com/formancehq/payments/internal/app/connectors/wise/client" "github.com/formancehq/payments/internal/app/task" - "github.com/formancehq/stack/libs/go-libs/logging" ) -func taskTransfer(logger logging.Logger, client *client, transfer Transfer) task.Task { +func taskTransfer(logger logging.Logger, client *client.Client, transfer Transfer) task.Task { return func( ctx context.Context, scheduler task.Scheduler, ) error { - profiles, err := client.getProfiles() + profiles, err := client.GetProfiles() if err != nil { return err } @@ -30,7 +30,7 @@ func taskTransfer(logger logging.Logger, client *client, transfer Transfer) task } } - quote, err := client.createQuote(profileID, transfer.Currency, transfer.Amount) + quote, err := client.CreateQuote(profileID, transfer.Currency, transfer.Amount) if err != nil { return err } @@ -42,7 +42,7 @@ func taskTransfer(logger logging.Logger, client *client, transfer Transfer) task transactionID := uuid.New().String() - err = client.createTransfer(quote, destinationAccount, transactionID) + err = client.CreateTransfer(quote, destinationAccount, transactionID) if err != nil { return err } diff --git a/components/payments/internal/app/integration/manager_test.go b/components/payments/internal/app/integration/manager_test.go index c9affa335..6580831ef 100644 --- a/components/payments/internal/app/integration/manager_test.go +++ b/components/payments/internal/app/integration/manager_test.go @@ -116,7 +116,10 @@ func TestUninstallConnector(t *testing.T) { } }). WithInstall(func(ctx task.ConnectorContext) error { - return ctx.Scheduler().Schedule(ctx.Context(), []byte(uuid.New().String()), false) + return ctx.Scheduler().Schedule(ctx.Context(), []byte(uuid.New().String()), models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + }) }). WithUninstall(func(ctx context.Context) error { close(uninstalled) diff --git a/components/payments/internal/app/models/task.go b/components/payments/internal/app/models/task.go index 242f03c8e..ba7c9ba78 100644 --- a/components/payments/internal/app/models/task.go +++ b/components/payments/internal/app/models/task.go @@ -11,18 +11,27 @@ import ( "github.com/google/uuid" ) +type ScheduleOption int + +const ( + OPTIONS_RUN_NOW ScheduleOption = iota + OPTIONS_RUN_IN_DURATION + OPTIONS_RUN_INDEFINITELY +) + type Task struct { bun.BaseModel `bun:"tasks.task"` - ID uuid.UUID `bun:",pk,nullzero"` - ConnectorID uuid.UUID - CreatedAt time.Time `bun:",nullzero"` - UpdatedAt time.Time `bun:",nullzero"` - Name string - Descriptor json.RawMessage - Status TaskStatus - Error string - State json.RawMessage + ID uuid.UUID `bun:",pk,nullzero"` + ConnectorID uuid.UUID + CreatedAt time.Time `bun:",nullzero"` + UpdatedAt time.Time `bun:",nullzero"` + Name string + Descriptor json.RawMessage + SchedulerOptions TaskSchedulerOptions + Status TaskStatus + Error string + State json.RawMessage Connector *Connector `bun:"rel:belongs-to,join:connector_id=id"` } @@ -31,6 +40,12 @@ func (t Task) GetDescriptor() TaskDescriptor { return TaskDescriptor(t.Descriptor) } +type TaskSchedulerOptions struct { + ScheduleOption ScheduleOption + Duration time.Duration + Restart bool +} + type TaskDescriptor json.RawMessage func (td TaskDescriptor) ToMessage() json.RawMessage { diff --git a/components/payments/internal/app/storage/migrations.go b/components/payments/internal/app/storage/migrations.go index ab582517c..eca0773df 100644 --- a/components/payments/internal/app/storage/migrations.go +++ b/components/payments/internal/app/storage/migrations.go @@ -332,6 +332,18 @@ func registerMigrations(migrator *migrations.Migrator) { return err } + return nil + }, + }, + migrations.Migration{ + Up: func(tx bun.Tx) error { + _, err := tx.Exec(` + ALTER TABLE tasks.task ADD COLUMN IF NOT EXISTS "scheduler_options" json; + `) + if err != nil { + return err + } + return nil }, }, diff --git a/components/payments/internal/app/storage/task.go b/components/payments/internal/app/storage/task.go index 73e4f6a38..5df547554 100644 --- a/components/payments/internal/app/storage/task.go +++ b/components/payments/internal/app/storage/task.go @@ -51,7 +51,14 @@ func (s *Storage) UpdateTaskState(ctx context.Context, provider models.Connector return nil } -func (s *Storage) FindAndUpsertTask(ctx context.Context, provider models.ConnectorProvider, descriptor models.TaskDescriptor, status models.TaskStatus, taskErr string) (*models.Task, error) { +func (s *Storage) FindAndUpsertTask( + ctx context.Context, + provider models.ConnectorProvider, + descriptor models.TaskDescriptor, + status models.TaskStatus, + schedulerOptions models.TaskSchedulerOptions, + taskErr string, +) (*models.Task, error) { _, err := s.GetTaskByDescriptor(ctx, provider, descriptor) if err != nil && !errors.Is(err, ErrNotFound) { return nil, e("failed to get task", err) @@ -63,7 +70,7 @@ func (s *Storage) FindAndUpsertTask(ctx context.Context, provider models.Connect return nil, e("failed to update task", err) } } else { - err = s.CreateTask(ctx, provider, descriptor, status) + err = s.CreateTask(ctx, provider, descriptor, status, schedulerOptions) if err != nil { return nil, e("failed to upsert task", err) } @@ -72,16 +79,17 @@ func (s *Storage) FindAndUpsertTask(ctx context.Context, provider models.Connect return s.GetTaskByDescriptor(ctx, provider, descriptor) } -func (s *Storage) CreateTask(ctx context.Context, provider models.ConnectorProvider, descriptor models.TaskDescriptor, status models.TaskStatus) error { +func (s *Storage) CreateTask(ctx context.Context, provider models.ConnectorProvider, descriptor models.TaskDescriptor, status models.TaskStatus, schedulerOptions models.TaskSchedulerOptions) error { connector, err := s.GetConnector(ctx, provider) if err != nil { return e("failed to get connector", err) } _, err = s.db.NewInsert().Model(&models.Task{ - ConnectorID: connector.ID, - Descriptor: descriptor.ToMessage(), - Status: status, + ConnectorID: connector.ID, + Descriptor: descriptor.ToMessage(), + Status: status, + SchedulerOptions: schedulerOptions, }).Exec(ctx) if err != nil { return e("failed to create task", err) diff --git a/components/payments/internal/app/task/scheduler.go b/components/payments/internal/app/task/scheduler.go index f8e23edeb..8e0209002 100644 --- a/components/payments/internal/app/task/scheduler.go +++ b/components/payments/internal/app/task/scheduler.go @@ -29,7 +29,7 @@ var ( ) type Scheduler interface { - Schedule(ctx context.Context, p models.TaskDescriptor, restart bool) error + Schedule(ctx context.Context, p models.TaskDescriptor, options models.TaskSchedulerOptions) error } type taskHolder struct { @@ -69,7 +69,7 @@ func (s *DefaultTaskScheduler) ReadTaskByDescriptor(ctx context.Context, descrip return s.store.GetTaskByDescriptor(ctx, s.provider, taskDescriptor) } -func (s *DefaultTaskScheduler) Schedule(ctx context.Context, descriptor models.TaskDescriptor, restart bool) error { +func (s *DefaultTaskScheduler) Schedule(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) error { s.mu.Lock() defer s.mu.Unlock() @@ -82,7 +82,7 @@ func (s *DefaultTaskScheduler) Schedule(ctx context.Context, descriptor models.T return ErrAlreadyScheduled } - if !restart { + if !options.Restart { _, err := s.ReadTaskByDescriptor(ctx, descriptor) if err == nil { return nil @@ -98,7 +98,7 @@ func (s *DefaultTaskScheduler) Schedule(ctx context.Context, descriptor models.T return nil } - if err := s.startTask(ctx, descriptor); err != nil { + if err := s.startTask(ctx, descriptor, options); err != nil { return errors.Wrap(err, "starting task") } @@ -141,7 +141,7 @@ func (s *DefaultTaskScheduler) Restore(ctx context.Context) error { } for _, task := range tasks { - err = s.startTask(ctx, task.GetDescriptor()) + err = s.startTask(ctx, task.GetDescriptor(), task.SchedulerOptions) if err != nil { s.logger(ctx).Errorf("Unable to restore task %s: %s", task.ID, err) } @@ -203,7 +203,9 @@ func (s *DefaultTaskScheduler) deleteTask(ctx context.Context, holder *taskHolde return } - err = s.startTask(ctx, oldestPendingTask.GetDescriptor()) + err = s.startTask(ctx, oldestPendingTask.GetDescriptor(), models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + }) if err != nil { logging.FromContext(ctx).Error(err) } @@ -211,9 +213,9 @@ func (s *DefaultTaskScheduler) deleteTask(ctx context.Context, holder *taskHolde type StopChan chan chan struct{} -func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.TaskDescriptor) error { +func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) error { task, err := s.store.FindAndUpsertTask(ctx, s.provider, descriptor, - models.TaskStatusActive, "") + models.TaskStatusActive, options, "") if err != nil { return errors.Wrap(err, "finding task and update") } @@ -298,35 +300,90 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. s.tasks[taskID] = holder - go func() { - logger.Infof("Starting task...") + switch options.ScheduleOption { + case models.OPTIONS_RUN_NOW: + options.Duration = 0 + fallthrough + case models.OPTIONS_RUN_IN_DURATION: + go func() { + if options.Duration > 0 { + logger.Infof("Waiting %s before starting task...", options.Duration) + time.Sleep(options.Duration) + } + + logger.Infof("Starting task...") + + defer func() { + defer span.End() + defer s.deleteTask(ctx, holder) + + if e := recover(); e != nil { + s.registerTaskError(ctx, holder, e) + debug.PrintStack() - defer func() { - defer span.End() - defer s.deleteTask(ctx, holder) + return + } + }() - if e := recover(); e != nil { - s.registerTaskError(ctx, holder, e) - debug.PrintStack() + err = container.Invoke(taskResolver) + if err != nil { + s.registerTaskError(ctx, holder, err) return } + + logger.Infof("Task terminated with success") + + err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, models.TaskStatusTerminated, "") + if err != nil { + logger.Error("Error updating task status: %s", err) + } }() + case models.OPTIONS_RUN_INDEFINITELY: + go func() { + defer func() { + defer span.End() + defer s.deleteTask(ctx, holder) - err = container.Invoke(taskResolver) - if err != nil { - s.registerTaskError(ctx, holder, err) + if e := recover(); e != nil { + s.registerTaskError(ctx, holder, e) + debug.PrintStack() - return - } + return + } + }() - logger.Infof("Task terminated with success") + // launch it once before starting the ticker + err = container.Invoke(taskResolver) + if err != nil { + s.registerTaskError(ctx, holder, err) - err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, models.TaskStatusTerminated, "") - if err != nil { - logger.Error("Error updating task status: %s", err) - } - }() + return + } + + logger.Infof("Starting task...") + ticker := time.NewTicker(options.Duration) + for { + select { + case ch := <-holder.stopChan: + logger.Infof("Stopping task...") + close(ch) + return + case <-ctx.Done(): + return + case <-ticker.C: + logger.Infof("Polling trigger, running task...") + err = container.Invoke(taskResolver) + if err != nil { + s.registerTaskError(ctx, holder, err) + + return + } + } + } + + }() + } return nil } diff --git a/components/payments/internal/app/task/scheduler_test.go b/components/payments/internal/app/task/scheduler_test.go index 3d07b1493..5c09cfaca 100644 --- a/components/payments/internal/app/task/scheduler_test.go +++ b/components/payments/internal/app/task/scheduler_test.go @@ -86,7 +86,10 @@ func TestTaskScheduler(t *testing.T) { }), 1) descriptor := newDescriptor() - err := scheduler.Schedule(context.TODO(), descriptor, false) + err := scheduler.Schedule(context.TODO(), descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + }) require.NoError(t, err) require.Eventually(t, TaskActive(store, provider, descriptor), time.Second, 100*time.Millisecond) @@ -109,11 +112,17 @@ func TestTaskScheduler(t *testing.T) { }), 1) descriptor := newDescriptor() - err := scheduler.Schedule(context.TODO(), descriptor, false) + err := scheduler.Schedule(context.TODO(), descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + }) require.NoError(t, err) require.Eventually(t, TaskActive(store, provider, descriptor), time.Second, 100*time.Millisecond) - err = scheduler.Schedule(context.TODO(), descriptor, false) + err = scheduler.Schedule(context.TODO(), descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + }) require.Equal(t, ErrAlreadyScheduled, err) }) @@ -130,7 +139,10 @@ func TestTaskScheduler(t *testing.T) { }), 1) descriptor := newDescriptor() - err := scheduler.Schedule(context.TODO(), descriptor, false) + err := scheduler.Schedule(context.TODO(), descriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + }) require.NoError(t, err) require.Eventually(t, TaskFailed(store, provider, descriptor, "test"), time.Second, 100*time.Millisecond) @@ -173,8 +185,14 @@ func TestTaskScheduler(t *testing.T) { panic("unknown descriptor") }), 1) - require.NoError(t, scheduler.Schedule(context.TODO(), descriptor1, false)) - require.NoError(t, scheduler.Schedule(context.TODO(), descriptor2, false)) + require.NoError(t, scheduler.Schedule(context.TODO(), descriptor1, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + })) + require.NoError(t, scheduler.Schedule(context.TODO(), descriptor2, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + })) require.Eventually(t, TaskActive(store, provider, descriptor1), time.Second, 100*time.Millisecond) require.Eventually(t, TaskPending(store, provider, descriptor2), time.Second, 100*time.Millisecond) close(task1Terminated) @@ -198,14 +216,20 @@ func TestTaskScheduler(t *testing.T) { case string(mainDescriptor): return func(ctx context.Context, scheduler Scheduler) { <-ctx.Done() - require.NoError(t, scheduler.Schedule(ctx, workerDescriptor, false)) + require.NoError(t, scheduler.Schedule(ctx, workerDescriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + })) } default: panic("should not be called") } }), 1) - require.NoError(t, scheduler.Schedule(context.TODO(), mainDescriptor, false)) + require.NoError(t, scheduler.Schedule(context.TODO(), mainDescriptor, models.TaskSchedulerOptions{ + ScheduleOption: models.OPTIONS_RUN_NOW, + Restart: false, + })) require.Eventually(t, TaskActive(store, provider, mainDescriptor), time.Second, 100*time.Millisecond) require.NoError(t, scheduler.Shutdown(context.TODO())) require.Eventually(t, TaskTerminated(store, provider, mainDescriptor), time.Second, 100*time.Millisecond) diff --git a/components/payments/internal/app/task/store.go b/components/payments/internal/app/task/store.go index 1791bd88c..ea8310ddd 100644 --- a/components/payments/internal/app/task/store.go +++ b/components/payments/internal/app/task/store.go @@ -12,7 +12,7 @@ import ( type Repository interface { UpdateTaskStatus(ctx context.Context, provider models.ConnectorProvider, descriptor models.TaskDescriptor, status models.TaskStatus, err string) error - FindAndUpsertTask(ctx context.Context, provider models.ConnectorProvider, descriptor models.TaskDescriptor, status models.TaskStatus, err string) (*models.Task, error) + FindAndUpsertTask(ctx context.Context, provider models.ConnectorProvider, descriptor models.TaskDescriptor, status models.TaskStatus, schedulerOptions models.TaskSchedulerOptions, err string) (*models.Task, error) ListTasksByStatus(ctx context.Context, provider models.ConnectorProvider, status models.TaskStatus) ([]models.Task, error) ListTasks(ctx context.Context, provider models.ConnectorProvider, pagination storage.Paginator) ([]models.Task, storage.PaginationDetails, error) ReadOldestPendingTask(ctx context.Context, provider models.ConnectorProvider) (*models.Task, error) diff --git a/components/payments/internal/app/task/storememory.go b/components/payments/internal/app/task/storememory.go index ed383df87..c3538ad97 100644 --- a/components/payments/internal/app/task/storememory.go +++ b/components/payments/internal/app/task/storememory.go @@ -144,8 +144,13 @@ func (s *InMemoryStore) ListTasksByStatus(ctx context.Context, return ret, nil } -func (s *InMemoryStore) FindAndUpsertTask(ctx context.Context, - provider models.ConnectorProvider, descriptor models.TaskDescriptor, status models.TaskStatus, taskErr string, +func (s *InMemoryStore) FindAndUpsertTask( + ctx context.Context, + provider models.ConnectorProvider, + descriptor models.TaskDescriptor, + status models.TaskStatus, + options models.TaskSchedulerOptions, + taskErr string, ) (*models.Task, error) { err := s.UpdateTaskStatus(ctx, provider, descriptor, status, taskErr) if err != nil {