Skip to content

Commit

Permalink
Add common interface between in-memory and on-disk transaction storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic committed Apr 15, 2024
1 parent 367a936 commit 3196ade
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 5 deletions.
7 changes: 4 additions & 3 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Config struct {
Logger *log.Entry
DB db.ReadWriter
EventStore *events.MemoryStore
TransactionStore *transactions.MemoryStore
TransactionStore transactions.TransactionStore
NetworkPassPhrase string
Archive historyarchive.ArchiveInterface
LedgerBackend backends.LedgerBackend
Expand Down Expand Up @@ -134,7 +134,7 @@ type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
transactionStore transactions.TransactionStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
Expand Down Expand Up @@ -308,7 +308,8 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge
return err
}
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds())
With(prometheus.Labels{"type": "ledger_close_meta"}).
Observe(time.Since(startTime).Seconds())

if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (h Handler) Close() {

type HandlerParams struct {
EventStore *events.MemoryStore
TransactionStore *transactions.MemoryStore
TransactionStore transactions.TransactionStore
LedgerEntryReader db.LedgerEntryReader
LedgerReader db.LedgerReader
Logger *log.Entry
Expand Down
45 changes: 45 additions & 0 deletions cmd/soroban-rpc/internal/transactions/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package transactions

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

// DatabaseStore is an on-disk (sqlite) store of Stellar transactions.
type DatabaseStore struct {
// passphrase is an immutable string containing the Stellar network
// passphrase and accessing it does not need to be protected by the lock
passphrase string

lock sync.RWMutex
transactions map[xdr.Hash]transaction
transactionsByLedger *ledgerbucketwindow.LedgerBucketWindow[[]xdr.Hash]
transactionDurationMetric *prometheus.SummaryVec
transactionCountMetric prometheus.Summary
}

func NewDatabaseStore(daemon interfaces.Daemon, networkPassphrase string, retentionWindow uint32) TransactionStore {
return NewMemoryStore(daemon, networkPassphrase, retentionWindow)
}

// func (m *DatabaseStore) IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) error {
// // startTime := time.Now()
// return nil
// }

// // GetLedgerRange returns the first and latest ledger available in the store.
// func (m *DatabaseStore) GetLedgerRange() ledgerbucketwindow.LedgerRange {
// m.lock.RLock()
// defer m.lock.RUnlock()
// return m.transactionsByLedger.GetLedgerRange()
// }

// // GetTransaction obtains a transaction from the store and whether it's present and the current store range
// func (m *DatabaseStore) GetTransaction(hash xdr.Hash) (Transaction, bool, ledgerbucketwindow.LedgerRange) {
// return Transaction{}, false, ledgerbucketwindow.LedgerRange{}
// }
15 changes: 15 additions & 0 deletions cmd/soroban-rpc/internal/transactions/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package transactions

import (
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

// TransactionStore lets you ingest (write) and query (read) transactions from
// an abstract backend storage (i.e. via in-memory or sqlite).
type TransactionStore interface {
IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) error
GetLedgerRange() ledgerbucketwindow.LedgerRange
GetTransaction(hash xdr.Hash) (Transaction, bool, ledgerbucketwindow.LedgerRange)
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type MemoryStore struct {
// will be included in the MemoryStore. If the MemoryStore
// is full, any transactions from new ledgers will evict
// older entries outside the retention window.
func NewMemoryStore(daemon interfaces.Daemon, networkPassphrase string, retentionWindow uint32) *MemoryStore {
func NewMemoryStore(daemon interfaces.Daemon, networkPassphrase string, retentionWindow uint32) TransactionStore {
window := ledgerbucketwindow.NewLedgerBucketWindow[[]xdr.Hash](retentionWindow)

// transactionDurationMetric is a metric for measuring latency of transaction store operations
Expand Down Expand Up @@ -75,6 +75,7 @@ func NewMemoryStore(daemon interfaces.Daemon, networkPassphrase string, retentio
// removed from the store.
func (m *MemoryStore) IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) error {
startTime := time.Now()

reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(m.networkPassphrase, ledgerCloseMeta)
if err != nil {
return err
Expand Down Expand Up @@ -135,6 +136,7 @@ func (m *MemoryStore) IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) er
for hash, tx := range hashMap {
m.transactions[hash] = tx
}

m.transactionDurationMetric.With(prometheus.Labels{"operation": "ingest"}).Observe(time.Since(startTime).Seconds())
m.transactionCountMetric.Observe(float64(txCount))
return nil
Expand Down

0 comments on commit 3196ade

Please sign in to comment.