-
Notifications
You must be signed in to change notification settings - Fork 134
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
648 changed files
with
6,708 additions
and
1,942 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# openapi generated pages | ||
**/*.api.mdx |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"position": 55, | ||
"label": "Build Custom Network Ingestion Pipeline", | ||
"link": { | ||
"type": "generated-index" | ||
} | ||
} |
290 changes: 290 additions & 0 deletions
290
docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,290 @@ | ||
--- | ||
title: Ingestion Pipeline Sample Code | ||
sidebar_position: 30 | ||
--- | ||
|
||
Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion SDK](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber. | ||
|
||
This example uses the ZeroMQ [goczmq](https://github.com/zeromq/goczmq) Go wrapper SDK, which requires a few o/s [dependent libraries to also be installed on the host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies). | ||
|
||
Put these files in a directory, compile and run with `go build -o pipeline ./.; ./pipeline` | ||
|
||
### `go.mod` | ||
|
||
<CodeExample> | ||
|
||
``` | ||
module example/pipeline | ||
go 1.22 | ||
toolchain go1.22.1 | ||
require ( | ||
github.com/stellar/go v0.0.0-20240614234001-3a31ed780c58 | ||
github.com/zeromq/goczmq v4.1.0+incompatible | ||
) | ||
``` | ||
|
||
</CodeExample> | ||
|
||
### `main.go` | ||
|
||
<CodeExample> | ||
|
||
```go | ||
package main | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"io" | ||
"log" | ||
"os" | ||
"os/signal" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/stellar/go/amount" | ||
"github.com/stellar/go/historyarchive" | ||
"github.com/stellar/go/ingest" | ||
"github.com/stellar/go/ingest/ledgerbackend" | ||
"github.com/stellar/go/network" | ||
"github.com/stellar/go/support/datastore" | ||
"github.com/stellar/go/support/storage" | ||
"github.com/stellar/go/xdr" | ||
|
||
"github.com/zeromq/goczmq" | ||
) | ||
|
||
// Application specifics | ||
type AppPayment struct { | ||
Timestamp uint | ||
BuyerAccountId string | ||
SellerAccountId string | ||
AssetCode string | ||
Amount string | ||
} | ||
|
||
// General stream topology | ||
type Message struct { | ||
Payload []byte | ||
} | ||
|
||
type Processor interface { | ||
Process(context.Context, Message) error | ||
} | ||
|
||
type Publisher interface { | ||
Subscribe(receiver Processor) | ||
} | ||
|
||
// Ingestion Pipeline Processors | ||
type ZeroMQOutboundAdapter struct { | ||
Publisher *goczmq.Sock | ||
} | ||
|
||
func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error { | ||
_, err := adapter.Publisher.Write(msg.Payload) | ||
return err | ||
} | ||
|
||
type AppPaymentTransformer struct { | ||
processors []Processor | ||
networkPassPhrase string | ||
} | ||
|
||
func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) { | ||
transformer.processors = append(transformer.processors, receiver) | ||
} | ||
|
||
func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error { | ||
ledgerCloseMeta := xdr.LedgerCloseMeta{} | ||
err := ledgerCloseMeta.UnmarshalBinary(msg.Payload) | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to unmarshal message payload to LedgerCloseMeta") | ||
} | ||
|
||
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta) | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence()) | ||
} | ||
|
||
closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime) | ||
|
||
// scan all transactions in a ledger for payments to derive new model from | ||
transaction, err := ledgerTxReader.Read() | ||
for ; err == nil; transaction, err = ledgerTxReader.Read() { | ||
for _, op := range transaction.Envelope.Operations() { | ||
switch op.Body.Type { | ||
case xdr.OperationTypePayment: | ||
networkPayment := op.Body.MustPaymentOp() | ||
myPayment := AppPayment{ | ||
Timestamp: closeTime, | ||
BuyerAccountId: networkPayment.Destination.Address(), | ||
SellerAccountId: op.SourceAccount.Address(), | ||
AssetCode: networkPayment.Asset.StringCanonical(), | ||
Amount: amount.String(networkPayment.Amount), | ||
} | ||
jsonBytes, err := json.Marshal(myPayment) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, processor := range transformer.processors { | ||
processor.Process(ctx, Message{Payload: jsonBytes}) | ||
} | ||
} | ||
} | ||
} | ||
if err != io.EOF { | ||
return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence()) | ||
} | ||
return nil | ||
} | ||
|
||
type CaptiveCoreInboundAdapter struct { | ||
TomlParams ledgerbackend.CaptiveCoreTomlParams | ||
processors []Processor | ||
historyArchiveURLs []string | ||
coreConfigNetworkTemplate []byte | ||
} | ||
|
||
func (adapter *CaptiveCoreInboundAdapter) Subscribe(receiver Processor) { | ||
adapter.processors = append(adapter.processors, receiver) | ||
} | ||
|
||
func (adapter *CaptiveCoreInboundAdapter) Run(ctx context.Context) error { | ||
// Setup captive core config to use the Pubnet network | ||
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromData(adapter.coreConfigNetworkTemplate, adapter.TomlParams) | ||
if err != nil { | ||
return errors.Wrap(err, "error creating captive core toml") | ||
} | ||
|
||
captiveConfig := ledgerbackend.CaptiveCoreConfig{ | ||
BinaryPath: adapter.TomlParams.CoreBinaryPath, | ||
HistoryArchiveURLs: adapter.TomlParams.HistoryArchiveURLs, | ||
Context: ctx, | ||
Toml: captiveCoreToml, | ||
} | ||
|
||
// Create a new captive core backend, the origin of ingestion pipeline | ||
captiveBackend, err := ledgerbackend.NewCaptive(captiveConfig) | ||
if err != nil { | ||
return errors.Wrap(err, "error creating captive core instance") | ||
} | ||
|
||
// Create a client to the network's history archives | ||
historyAchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{ | ||
ConnectOptions: storage.ConnectOptions{ | ||
UserAgent: "my_app", | ||
Context: ctx, | ||
}, | ||
}) | ||
|
||
if err != nil { | ||
return errors.Wrap(err, "error creating history archive client") | ||
} | ||
|
||
// Acquire the most recent ledger on network | ||
latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(historyAchive) | ||
if err != nil { | ||
return errors.Wrap(err, "error getting latest ledger") | ||
} | ||
|
||
// Tell the captive core instance to emit LedgerCloseMeta starting at | ||
// latest network ledger and continuing indefintely, streaming. | ||
if err := captiveBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(latestNetworkLedger)); err != nil { | ||
return errors.Wrap(err, "error preparing captive core ledger range") | ||
} | ||
|
||
// Run endless loop that receives LedgerCloseMeta from captive core for each new | ||
// ledger generated by the network and pushes it to next processors in pipeline | ||
for nextLedger := latestNetworkLedger; true; nextLedger++ { | ||
ledgerCloseMeta, err := captiveBackend.GetLedger(ctx, nextLedger) | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger) | ||
} | ||
|
||
payload, err := ledgerCloseMeta.MarshalBinary() | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to encode ledger %d from xdr to binary", nextLedger) | ||
} | ||
|
||
log.Printf("Processing Ledger %v", nextLedger) | ||
for _, processor := range adapter.processors { | ||
if err := processor.Process(ctx, Message{Payload: payload}); err != nil { | ||
return errors.Wrapf(err, "failed to process ledger %d", nextLedger) | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func main() { | ||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) | ||
defer stop() | ||
|
||
// create the inbound 'source of origin' adapter, | ||
// imports network data using this captive core config | ||
networkInboundAdapter := &CaptiveCoreInboundAdapter{ | ||
historyArchiveURLs: network.TestNetworkhistoryArchiveURLs, | ||
coreConfigNetworkTemplate: ledgerbackend.TestnetDefaultConfig, | ||
TomlParams: ledgerbackend.CaptiveCoreTomlParams{ | ||
NetworkPassphrase: network.TestNetworkPassphrase, | ||
HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs, | ||
UseDB: true, | ||
CoreBinaryPath: "stellar-core", // assumes you have installed stellar-core on your o/s PATH | ||
}, | ||
} | ||
|
||
// create the app transformer to convert network data to application data model | ||
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.TestNetworkPassphrase} | ||
|
||
// create the outbound adapter, this is the end point of the pipeline | ||
// publishes application data model as messages to a broker | ||
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555") | ||
if err != nil { | ||
log.Printf("error creating 0MQ publisher: %v", err) | ||
return | ||
} | ||
defer publisher.Destroy() | ||
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher} | ||
|
||
// wire up the ingestion pipeline and let it run | ||
appTransformer.Subscribe(outboundAdapter) | ||
networkInboundAdapter.Subscribe(appTransformer) | ||
log.Printf("Payment ingestion pipeline ended %v", networkInboundAdapter.Run(ctx)) | ||
} | ||
``` | ||
|
||
</CodeExample> | ||
|
||
### `distributed_payment_subsciber.py` | ||
|
||
A Python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. Make sure to `pip install pyzmq` | ||
|
||
<CodeExample> | ||
|
||
```python | ||
import sys | ||
import zmq | ||
import json | ||
|
||
# Socket to talk to server | ||
context = zmq.Context() | ||
socket = context.socket(zmq.SUB) | ||
|
||
print("Collecting next 10 payments from pipeline ...") | ||
socket.connect("tcp://127.0.0.1:5555") | ||
socket.subscribe("") | ||
|
||
for request in range(10): | ||
|
||
message = socket.recv() | ||
json_object = json.loads(message) | ||
json_formatted_str = json.dumps(json_object, indent=2) | ||
print(f"Received payment:\n\n{json_formatted_str}") | ||
|
||
``` | ||
|
||
</CodeExample> |
Oops, something went wrong.
6b2cdb9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the default GitHub merge. Hopefully it resolves outstanding questions in #657. I've just been running the local version as instructed with
npx docusaurus start
. 🛠️