Skip to content

Commit

Permalink
feat: add SubscribeTickers func to providers (#592)
Browse files Browse the repository at this point in the history
* feat: add SubscribeTickers to binance provider

* feat: add SubscribeTickers to huobi provider

* feat: add SubscribeTickers to huobi provider

* feat: add SubscribeTickers to Okx provider

* add subscribe tickers func to changelog

* removed unused method

* fix: set right amount of topics on reconnect

* chore: update godocs

* chore: update godocs

* chore: update godocs

* chore: update godocs

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
  • Loading branch information
RafilxTenfen and alexanderbez authored Mar 2, 2022
1 parent f321b54 commit 380ec49
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 245 deletions.
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#551](https://github.com/umee-network/umee/pull/551) Update Binance provider to use WebSocket.
- [#569](https://github.com/umee-network/umee/pull/569) Update Huobi provider to use WebSocket.
- [#580](https://github.com/umee-network/umee/pull/580) Update Kraken provider to use WebSocket.
- [#592](https://github.com/umee-network/umee/pull/592) Add subscribe ticker function to the following providers: Binance, Huobi, Kraken, and Okx.

### Bug Fixes

Expand Down
149 changes: 89 additions & 60 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,38 @@ type (
wsURL url.URL
wsClient *websocket.Conn
logger zerolog.Logger
mu sync.Mutex
tickers map[string]BinanceTicker // Symbol => BinanceTicker
candles map[string]BinanceCandle // Symbol => BinanceCandle
subscribedPairs []types.CurrencyPair
mtx sync.Mutex
tickers map[string]BinanceTicker // Symbol => BinanceTicker
candles map[string]BinanceCandle // Symbol => BinanceCandle
subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair
}

// BinanceTicker ticker price response
// https://pkg.go.dev/encoding/json#Unmarshal
// Unmarshal matches incoming object keys to the keys
// used by Marshal (either the struct field name or its tag),
// preferring an exact match but also accepting a case-insensitive match
// C is not used, but it avoids to implement specific UnmarshalJSON
// BinanceTicker ticker price response. https://pkg.go.dev/encoding/json#Unmarshal
// Unmarshal matches incoming object keys to the keys used by Marshal (either the
// struct field name or its tag), preferring an exact match but also accepting a
// case-insensitive match. C field which is Statistics close time is not used, but
// it avoids to implement specific UnmarshalJSON.
BinanceTicker struct {
Symbol string `json:"s"` // Symbol ex.: BTCUSDT
LastPrice string `json:"c"` // Last price ex.: 0.0025
Volume string `json:"v"` // Total traded base asset volume ex.: 1000
C uint64 `json:"C"` // Statistics close time
}

// BinanceCandleMetadata candle metadata used to compute tvwap price.
BinanceCandleMetadata struct {
Close string `json:"c"` // Price at close
TimeStamp int64 `json:"T"` // Close time in unix epoch ex.: 1645756200000
Volume string `json:"v"` // Volume during period
}

// BinanceCandle candle binance websocket channel "kline_1m" response.
BinanceCandle struct {
Symbol string `json:"s"` // Symbol ex.: BTCUSDT
Metadata BinanceCandleMetadata `json:"k"` // Metadata for candle
}

// BinanceSubscribeMsg Msg to subscribe all the tickers channels
// BinanceSubscribeMsg Msg to subscribe all the tickers channels.
BinanceSubscriptionMsg struct {
Method string `json:"method"` // SUBSCRIBE/UNSUBSCRIBE
Params []string `json:"params"` // streams to subscribe ex.: usdtatom@ticker
Expand All @@ -87,13 +88,10 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
logger: logger.With().Str("provider", "binance").Logger(),
tickers: map[string]BinanceTicker{},
candles: map[string]BinanceCandle{},
subscribedPairs: pairs,
subscribedPairs: map[string]types.CurrencyPair{},
}

if err := provider.subscribeTickers(pairs...); err != nil {
return nil, err
}
if err := provider.subscribeCandles(pairs...); err != nil {
if err := provider.SubscribeTickers(pairs...); err != nil {
return nil, err
}

Expand All @@ -102,6 +100,26 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
return provider, nil
}

// SubscribeTickers subscribe all currency pairs into ticker and candle channels.
func (p *BinanceProvider) SubscribeTickers(cps ...types.CurrencyPair) error {
pairs := make([]string, len(cps)*2)

iterator := 0
for _, cp := range cps {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
}

if err := p.subscribePairs(pairs...); err != nil {
return err
}

p.setSubscribedPairs(cps...)
return nil
}

// GetTickerPrices returns the tickerPrices based on the provided pairs.
func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) {
tickerPrices := make(map[string]TickerPrice, len(pairs))
Expand Down Expand Up @@ -150,58 +168,36 @@ func (p *BinanceProvider) messageReceived(messageType int, bz []byte) {
if err := json.Unmarshal(bz, &tickerResp); err != nil {
p.logger.Debug().Err(err).Msg("could not unmarshal ticker response")
}
if err := json.Unmarshal(bz, &candleResp); err != nil {
p.logger.Debug().Err(err).Msg("could not unmarshal candle response")
}

if len(tickerResp.LastPrice) != 0 {
p.setTickerPair(tickerResp)
return
}

if err := json.Unmarshal(bz, &candleResp); err != nil {
p.logger.Debug().Err(err).Msg("could not unmarshal candle response")
return
}
if len(candleResp.Metadata.Close) != 0 {
p.setCandlePair(candleResp)
}
}

func (p *BinanceProvider) setTickerPair(ticker BinanceTicker) {
p.mu.Lock()
defer p.mu.Unlock()
p.mtx.Lock()
defer p.mtx.Unlock()
p.tickers[ticker.Symbol] = ticker
}

func (p *BinanceProvider) setCandlePair(candle BinanceCandle) {
p.mu.Lock()
defer p.mu.Unlock()
p.mtx.Lock()
defer p.mtx.Unlock()
p.candles[candle.Symbol] = candle
}

func (ticker BinanceTicker) toTickerPrice() (TickerPrice, error) {
return newTickerPrice("Binance", ticker.Symbol, ticker.LastPrice, ticker.Volume)
}

// subscribeTickers subscribe to all currency pairs
func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error {
params := make([]string, len(cps))

for i, cp := range cps {
params[i] = strings.ToLower(cp.String() + "@ticker")
}

subsMsg := newBinanceSubscriptionMsg(params...)
return p.wsClient.WriteJSON(subsMsg)
}

// subscribeCandles subscribe to all candles
func (p *BinanceProvider) subscribeCandles(cps ...types.CurrencyPair) error {
params := make([]string, len(cps))

for i, cp := range cps {
params[i] = strings.ToLower(cp.String() + "@kline_1m")
}

subsMsg := newBinanceSubscriptionMsg(params...)
return p.wsClient.WriteJSON(subsMsg)
}

func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
reconnectTicker := time.NewTicker(defaultMaxConnectionTime)
defer reconnectTicker.Stop()
Expand All @@ -213,7 +209,7 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
case <-time.After(defaultReadNewWSMessage):
messageType, bz, err := p.wsClient.ReadMessage()
if err != nil {
// if some error occurs continue to try to read the next message
// if some error occurs continue to try to read the next message.
p.logger.Err(err).Msg("could not read message")
continue
}
Expand All @@ -233,13 +229,12 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
}
}

// reconnect closes the last WS connection and create a new one
// A single connection to stream.binance.com is only valid for 24 hours;
// expect to be disconnected at the 24 hour mark
// The websocket server will send a ping frame every 3 minutes.
// If the websocket server does not receive a pong frame back from
// the connection within a 10 minute period, the connection will be disconnected.
// Unsolicited pong frames are allowed.
// reconnect closes the last WS connection then create a new one and subscribe to
// all subscribed pairs in the ticker and candle pais. A single connection to
// stream.binance.com is only valid for 24 hours; expect to be disconnected at the
// 24 hour mark. The websocket server will send a ping frame every 3 minutes. If
// the websocket server does not receive a pong frame back from the connection
// within a 10 minute period, the connection will be disconnected.
func (p *BinanceProvider) reconnect() error {
p.wsClient.Close()

Expand All @@ -250,10 +245,16 @@ func (p *BinanceProvider) reconnect() error {
}
p.wsClient = wsConn

if err := p.subscribeCandles(p.subscribedPairs...); err != nil {
return err
pairs := make([]string, len(p.subscribedPairs)*2)
iterator := 0
for _, cp := range p.subscribedPairs {
pairs[iterator] = currencyPairToBinanceTickerPair(cp)
iterator++
pairs[iterator] = currencyPairToBinanceCandlePair(cp)
iterator++
}
return p.subscribeTickers(p.subscribedPairs...)

return p.subscribePairs(pairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in reconnect.
Expand All @@ -276,7 +277,35 @@ func (p *BinanceProvider) keepReconnecting() {
}
}

// newBinanceSubscriptionMsg returns a new subscription Msg
// setSubscribedPairs sets N currency pairs to the map of subscribed pairs.
func (p *BinanceProvider) setSubscribedPairs(cps ...types.CurrencyPair) {
p.mtx.Lock()
defer p.mtx.Unlock()

for _, cp := range cps {
p.subscribedPairs[cp.String()] = cp
}
}

// subscribePairs write the subscription msg to the provider.
func (p *BinanceProvider) subscribePairs(pairs ...string) error {
subsMsg := newBinanceSubscriptionMsg(pairs...)
return p.wsClient.WriteJSON(subsMsg)
}

// currencyPairToBinanceTickerPair receives a currency pair and return binance
// ticker symbol atomusdt@ticker.
func currencyPairToBinanceTickerPair(cp types.CurrencyPair) string {
return strings.ToLower(cp.String() + "@ticker")
}

// currencyPairToBinanceCandlePair receives a currency pair and return binance
// candle symbol atomusdt@kline_1m.
func currencyPairToBinanceCandlePair(cp types.CurrencyPair) string {
return strings.ToLower(cp.String() + "@kline_1m")
}

// newBinanceSubscriptionMsg returns a new subscription Msg.
func newBinanceSubscriptionMsg(params ...string) BinanceSubscriptionMsg {
return BinanceSubscriptionMsg{
Method: "SUBSCRIBE",
Expand Down
6 changes: 6 additions & 0 deletions price-feeder/oracle/provider/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,9 @@ func TestBinanceProvider_GetTickerPrices(t *testing.T) {
require.Nil(t, prices)
})
}

func TestBinanceCurrencyPairToBinancePair(t *testing.T) {
cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"}
binanceSymbol := currencyPairToBinanceTickerPair(cp)
require.Equal(t, binanceSymbol, "atomusdt@ticker")
}
Loading

0 comments on commit 380ec49

Please sign in to comment.